a love letter to tangled (android, iOS, and a search API)
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: use lightrail for backfilling repo index

+965 -336
+6 -100
README.md
··· 19 19 20 20 ## Development 21 21 22 - Use the top-level [`justfile`](justfile) for common workflows: 23 - 24 - ```bash 25 - just dev 26 - just build 27 - just test 28 - just api-run-api 29 - ``` 22 + Use the top-level [`justfile`](justfile) for common workflows (`just --list` to view) 30 23 31 - The committed `apps/twisted/.env` points at production. Use `apps/twisted/.env.local` 32 - for machine-local overrides such as a localhost API or OAuth callback. 24 + Use `apps/twisted/.env.local` for machine-local overrides such as a localhost API or OAuth callback. 33 25 34 26 ## Run Locally 35 27 ··· 104 96 Dev builds keep the current OAuth flow available. Production builds are read-only 105 97 and hide auth entry points for now. 106 98 107 - ### Local API DB 108 - 109 - The experimental local API database lives at `packages/api/twister-dev.db`. 110 - Treat it as disposable unless you explicitly back it up. 111 - 112 - Operational rules: 113 - 114 - 1. Stop the API before copying or restoring the file. 115 - 2. Copy `twister-dev.db` and any matching `-wal` or `-shm` sidecars together. 116 - 3. Prefer restore-or-rebuild over manual repair if the DB looks suspect. 117 - 4. Let the file grow during experiments, then compact or delete it afterward. 118 - 119 - Useful local commands: 120 - 121 - ```bash 122 - cd packages/api 123 - du -h twister-dev.db* 124 - ls -lh twister-dev.db* 125 - ``` 126 - 127 - ## Infrastructure Setup 128 - 129 - ### Turso 130 - 131 - Use one Turso database per environment, for example: 132 - 133 - - `twister-dev` 134 - - `twister-prod` 135 - 136 - Do not introduce separate app variable names for dev and prod. Always use the same variables: 137 - 138 - - `TURSO_DATABASE_URL` 139 - - `TURSO_AUTH_TOKEN` 140 - 141 - Only the values change per environment. 142 - 143 - Example: 144 - 145 - ```bash 146 - # Development 147 - TURSO_DATABASE_URL=libsql://twister-dev-your-org.turso.io 148 - TURSO_AUTH_TOKEN=... 149 - 150 - # Production 151 - TURSO_DATABASE_URL=libsql://twister-prod-your-org.turso.io 152 - TURSO_AUTH_TOKEN=... 153 - ``` 154 - 155 - ### Railway 99 + ## Attributions 156 100 157 - Create or reuse one Railway project containing: 158 - 159 - - existing `tap` 160 - - `api` running `twister api` 161 - - `indexer` running `twister indexer` 162 - 163 - Set these shared variables on the Railway services: 164 - 165 - - `TURSO_DATABASE_URL` 166 - - `TURSO_AUTH_TOKEN` 167 - - `LOG_LEVEL` 168 - - `LOG_FORMAT` 169 - 170 - Set these API-specific variables: 171 - 172 - - `HTTP_BIND_ADDR` 173 - - `SEARCH_DEFAULT_LIMIT` 174 - - `SEARCH_MAX_LIMIT` 175 - - `ENABLE_ADMIN_ENDPOINTS` 176 - - `ADMIN_AUTH_TOKEN` 177 - - `READ_THROUGH_MODE` 178 - - `READ_THROUGH_COLLECTIONS` 179 - - `READ_THROUGH_MAX_ATTEMPTS` 180 - 181 - Set these indexer-specific variables: 182 - 183 - - `TAP_URL` 184 - - `TAP_AUTH_PASSWORD` 185 - - `INDEXED_COLLECTIONS` 186 - 187 - If you use separate Railway environments for dev and prod, keep the same variable names in both and only swap the Turso values. 188 - 189 - ### First Bootstrap 190 - 191 - For a brand-new environment: 192 - 193 - 1. Point `TURSO_DATABASE_URL` and `TURSO_AUTH_TOKEN` at the target database. 194 - 2. Deploy `api` and `indexer` on Railway. 195 - 3. Verify API readiness and indexer health. 196 - 4. Run `twister backfill` with your seed file. 197 - 5. Treat the environment as search-ready only after historical backfill completes. 101 + This project relies heavily on the work of the [Tangled team](https://tangled.org/tangled.org) (duh) 102 + and the infrastructure made available by [microcosm](https://microcosm.blue), specifically 103 + Lightrail and Constellation.
+1
docs/README.md
··· 8 8 - [`reference/app.md`](reference/app.md) — Ionic Vue mobile app 9 9 - [`reference/deployment-walkthrough.md`](reference/deployment-walkthrough.md) — Railway deployment guide 10 10 - [`reference/lexicons.md`](reference/lexicons.md) — Tangled AT Protocol record types 11 + - [`reference/metrics.md`](reference/metrics.md) — Railway and Turso usage checks after deploy 11 12 - [`reference/resync.md`](reference/resync.md) — Backfill and repo-resync recovery playbook 12 13 13 14 ## Specs
+10 -3
docs/reference/api.md
··· 159 159 160 160 ## Backfill 161 161 162 - The backfill command discovers users from a seed file and registers them with Tap for indexing. Discovery fans out via follow graphs and repo collaborators up to a configurable hop depth (default 2). Supports dry-run mode, configurable concurrency and batch sizes, and is idempotent. 162 + The backfill command now defaults to `--source lightrail`: it calls 163 + `com.atproto.sync.listReposByCollection`, dedupes returned DIDs, and batch 164 + submits them to Tap. `--source graph` keeps the older seed-file follow and 165 + collaborator crawl for targeted fallback runs. 163 166 164 167 ## Configuration 165 168 ··· 185 188 | `PLC_DIRECTORY_URL` | `https://plc.directory` | PLC Directory | 186 189 | `XRPC_TIMEOUT` | 15s | XRPC HTTP timeout | 187 190 191 + Recommended production practice is to use explicit search-relevant collection 192 + lists for `INDEXED_COLLECTIONS` and `READ_THROUGH_COLLECTIONS`, not 193 + `sh.tangled.*`, and to leave `sh.tangled.graph.follow` out of both. 194 + 188 195 ## Deployment 189 196 190 197 Deployed on Railway with three services: 191 198 192 - - **api** — HTTP server (port 8080, health at `/healthz`) 193 - - **indexer** — Tap consumer (health at `:9090/healthz`) 199 + - **api** — HTTP server (port 8080, health at `/readyz`) 200 + - **indexer** — Tap consumer (health at `:9090/health`) 194 201 - **tap** — Tap instance (external dependency) 195 202 196 203 All services share the same Turso database. The API and indexer are separate deployments of the same binary with different subcommands.
+9 -16
docs/reference/deployment-walkthrough.md
··· 23 23 - a Railway account and the Railway CLI 24 24 - a Turso database URL and auth token 25 25 - a Tap URL and Tap auth password 26 - - a seed list for the first backfill run 27 26 From this machine: 28 27 29 28 ```sh ··· 68 67 - `SEARCH_DEFAULT_LIMIT=20` 69 68 - `SEARCH_MAX_LIMIT=100` 70 69 - `READ_THROUGH_MODE=missing` 71 - - `READ_THROUGH_COLLECTIONS=sh.tangled.*` 70 + - `READ_THROUGH_COLLECTIONS=<explicit search collection CSV>` 72 71 - `READ_THROUGH_MAX_ATTEMPTS=5` 73 72 - `ENABLE_ADMIN_ENDPOINTS=false` 74 73 - `ADMIN_AUTH_TOKEN=<set this if admin routes are enabled>` ··· 76 75 - `INDEXER_HEALTH_ADDR=0.0.0.0:${{ PORT }}` 77 76 - `TAP_URL=<your Tap URL>` 78 77 - `TAP_AUTH_PASSWORD=<your Tap password>` 79 - - `INDEXED_COLLECTIONS=sh.tangled.*` 78 + - `INDEXED_COLLECTIONS=<matching explicit search collection CSV>` 80 79 - `ENABLE_INGEST_ENRICHMENT=true` 80 + Do not use `sh.tangled.*` for those allowlists. Match the Lightrail-backed 81 + search collection set and leave `sh.tangled.graph.follow` out. 81 82 Optional OAuth variables for a Railway-hosted web client metadata endpoint: 82 83 - `OAUTH_CLIENT_ID` 83 84 - `OAUTH_REDIRECT_URIS` ··· 114 115 3. Confirm the `api` domain returns `200` from `/readyz`. 115 116 4. Confirm the `indexer` returns `200` from `/health`. 116 117 5. Run the initial backfill against the same Turso and Tap environment. 117 - One simple way to run backfill from this machine is to use the same env values 118 - locally and execute: 118 + Use Railway shell so the command runs inside the live `indexer` environment: 119 119 120 120 ```sh 121 - cd /Users/owais/Projects/Twisted/packages/api 122 - go run ./main.go backfill --seeds /path/to/seeds.txt 121 + cd /Users/owais/Projects/Twisted 122 + railway link # Select indexer service if prompted 123 + railway shell 124 + twister backfill --source lightrail 123 125 ``` 124 126 125 127 Do not call the environment ready until that first backfill has completed. ··· 139 141 pnpm --dir apps/twisted build 140 142 pnpm --dir apps/twisted exec cap sync 141 143 ``` 142 - 143 - ## Operating Model 144 - 145 - This is the practical split: 146 - 147 - - Railway hosts the always-on backend 148 - - Turso stores indexed data 149 - - this machine, or CI, builds the mobile app and points it at Railway 150 - If you later want a Railway-hosted web frontend, add that as a separate service.
+126
docs/reference/metrics.md
··· 1 + # Metrics To Watch 2 + 3 + Use this after deploying the Lightrail-backed backfill flow and detail-only 4 + read-through changes. 5 + 6 + ## Goal 7 + 8 + Confirm that: 9 + 10 + - the API stops creating broad read-through churn during browse traffic 11 + - the indexer still keeps search current through Tap 12 + - bootstrap backfills become cheaper and more predictable 13 + 14 + ## Railway 15 + 16 + Watch both `api` and `indexer` for 24 to 48 hours after deploy. 17 + 18 + ### API service 19 + 20 + Expected direction: 21 + 22 + - lower average CPU 23 + - fewer latency spikes on browse-heavy endpoints 24 + - lower memory churn from fewer queued background jobs 25 + 26 + Useful checks: 27 + 28 + - CPU usage before and after deploy 29 + - memory usage before and after deploy 30 + - request latency for browse-heavy periods 31 + - restart count 32 + 33 + If this change is helping, the API should look flatter under normal browsing, 34 + especially when clients hit repo lists, issue lists, pull lists, or follows. 35 + 36 + ### Indexer service 37 + 38 + Expected direction: 39 + 40 + - similar steady-state load during normal Tap ingest 41 + - shorter, more deliberate spikes only when `twister backfill` is run 42 + 43 + Useful checks: 44 + 45 + - CPU during normal operation 46 + - CPU during `twister backfill --source lightrail` 47 + - memory during backfill 48 + - restart count 49 + 50 + The indexer may still spike during an initial bootstrap. That is expected. The 51 + important change is that the API should stop causing constant incidental work. 52 + 53 + ## Turso 54 + 55 + This is where the clearest savings should show up. 56 + 57 + Expected direction: 58 + 59 + - fewer write operations 60 + - fewer row updates in indexing job tables 61 + - lower write amplification from browse traffic 62 + 63 + Useful checks: 64 + 65 + - total row writes 66 + - total queries 67 + - write-heavy windows during normal app usage 68 + - latency on write statements if you have it 69 + 70 + The main reduction should come from no longer enqueueing whole list responses 71 + into `indexing_jobs` during browse requests. 72 + 73 + ## Twister Admin Signals 74 + 75 + If admin endpoints are enabled, compare these before and after deploy: 76 + 77 + - `read_through.pending` 78 + - `read_through.processing` 79 + - `read_through.failed` 80 + - `read_through.dead_letter` 81 + - `read_through.last_processed_at` 82 + 83 + Healthy post-change behavior: 84 + 85 + - pending stays near zero most of the time 86 + - processing only bumps when detail pages fetch missing records 87 + - failed and dead-letter counts grow slowly, not continuously 88 + 89 + Relevant endpoint: 90 + 91 + ```sh 92 + curl -H "Authorization: Bearer $ADMIN_AUTH_TOKEN" http://<api-host>/admin/status 93 + ``` 94 + 95 + ## What To Compare 96 + 97 + Use the same day-of-week and similar traffic windows if possible. 98 + 99 + Good comparisons: 100 + 101 + - 24 hours before deploy vs 24 hours after deploy 102 + - one browse-heavy period before vs after 103 + - one bootstrap backfill run before vs after 104 + 105 + ## Success Signals 106 + 107 + Treat the rollout as successful if most of these are true: 108 + 109 + - API CPU is lower or less spiky under normal browsing 110 + - Turso writes drop during browse-heavy traffic 111 + - read-through queue counts stay close to zero most of the time 112 + - backfill runs complete with fewer upstream calls and cleaner batching 113 + - search freshness still tracks Tap ingest without visible regressions 114 + 115 + ## Failure Signals 116 + 117 + Investigate if you see any of these: 118 + 119 + - search misses rise after deploy 120 + - detail pages repeatedly enqueue the same records 121 + - `read_through.pending` grows and does not drain 122 + - indexer CPU stays elevated long after a bootstrap run 123 + - Turso writes do not drop despite the handler changes 124 + 125 + If that happens, inspect Tap coverage first, then spot-check whether operators 126 + ran `twister backfill --source lightrail` for the environment.
+15 -11
docs/reference/resync.md
··· 1 1 --- 2 2 title: Backfill & Resync Playbook 3 - updated: 2026-03-25 3 + updated: 2026-03-26 4 4 --- 5 5 6 6 Twister's search index has three recovery paths. Choose based on what broke. ··· 39 39 40 40 ### `twister backfill` 41 41 42 - Discovers users via follow graph from seed DIDs/handles, checks Tap status for 43 - each, and registers untracked repos with Tap `/repos/add`. 42 + Defaults to `--source lightrail`: discovers DIDs from 43 + `com.atproto.sync.listReposByCollection` and submits them to Tap in batches. 44 + Use `--source graph` only for targeted fallback seeding from handles or DIDs. 44 45 45 46 ```sh 46 - # dry-run first 47 - twister backfill --seeds seeds.txt --max-hops 2 --dry-run 47 + # full-network dry-run first 48 + twister backfill --dry-run 48 49 49 - # real run 50 - twister backfill --seeds seeds.txt --max-hops 2 \ 50 + # full-network bootstrap 51 + twister backfill 52 + 53 + # targeted fallback 54 + twister backfill --source graph --seeds seeds.txt --max-hops 2 \ 51 55 --concurrency 5 --batch-size 10 --batch-delay 1s 52 56 ``` 53 57 54 - Safe to re-run. Discovery deduplicates and `repos/add` is idempotent. 58 + Safe to re-run. Discovery deduplicates and `repos/add` is treated as idempotent. 55 59 56 60 ### `twister reindex` 57 61 ··· 106 110 1. Check if the DID is tracked by Tap. If not, run `backfill`: 107 111 108 112 ```sh 109 - twister backfill --seeds <handle-or-did> --max-hops 0 113 + twister backfill --source graph --seeds <handle-or-did> --max-hops 0 110 114 ``` 111 115 112 116 2. Once Tap is tracking the DID, the `indexer` will deliver historical events. ··· 135 139 2. Register repos with Tap: 136 140 137 141 ```sh 138 - twister backfill --seeds seeds.txt --max-hops 2 --dry-run 139 - twister backfill --seeds seeds.txt --max-hops 2 142 + twister backfill --dry-run 143 + twister backfill 140 144 ``` 141 145 142 146 3. Start the indexer and let it consume: `twister indexer`
+1 -2
docs/todo.md
··· 1 1 --- 2 2 title: Parking Lot 3 - updated: 2026-03-25 3 + updated: 2026-03-26 4 4 --- 5 5 6 6 Search stabilization is active roadmap work now, not parking-lot work. ··· 8 8 Still parked: 9 9 10 10 - Semantic and hybrid search stay deferred until local-only storage, smoke tests, read-through indexing, and the JetStream cache are stable. 11 - - Revisit `com.atproto.sync.listReposByCollection` as a complementary backfill discovery source after the Tap-driven indexing path is reliable.
-10
packages/api/internal/api/actors.go
··· 90 90 if err != nil { 91 91 return nil, fmt.Errorf("list repos for %s: %w", actor.DID, err) 92 92 } 93 - s.enqueueXRPCList(r.Context(), entries) 94 93 95 94 for _, entry := range entries { 96 95 name, _ := entry.Value["name"].(string) ··· 230 229 s.actorError(w, err) 231 230 return 232 231 } 233 - s.enqueueXRPCList(r.Context(), entries) 234 232 235 233 records := make([]recordEntry, len(entries)) 236 234 for i, e := range entries { ··· 432 430 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch issues")) 433 431 return 434 432 } 435 - s.enqueueXRPCList(r.Context(), issues) 436 433 437 434 var records []issueEntry 438 435 for _, e := range issues { ··· 470 467 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch pulls")) 471 468 return 472 469 } 473 - s.enqueueXRPCList(r.Context(), pulls) 474 470 475 471 var records []pullEntry 476 472 for _, e := range pulls { ··· 509 505 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch issues")) 510 506 return 511 507 } 512 - s.enqueueXRPCList(r.Context(), issues) 513 508 514 509 records := make([]issueEntry, len(issues)) 515 510 for i, e := range issues { ··· 540 535 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch pulls")) 541 536 return 542 537 } 543 - s.enqueueXRPCList(r.Context(), pulls) 544 538 545 539 records := make([]pullEntry, len(pulls)) 546 540 for i, e := range pulls { ··· 571 565 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch follows")) 572 566 return 573 567 } 574 - s.enqueueXRPCList(r.Context(), entries) 575 568 576 569 records := make([]recordEntry, len(entries)) 577 570 for i, e := range entries { ··· 599 592 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch strings")) 600 593 return 601 594 } 602 - s.enqueueXRPCList(r.Context(), entries) 603 595 604 596 records := make([]recordEntry, len(entries)) 605 597 for i, e := range entries { ··· 669 661 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch comments")) 670 662 return 671 663 } 672 - s.enqueueXRPCList(r.Context(), entries) 673 664 674 665 var records []recordEntry 675 666 for _, e := range entries { ··· 746 737 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch comments")) 747 738 return 748 739 } 749 - s.enqueueXRPCList(r.Context(), entries) 750 740 751 741 var records []recordEntry 752 742 for _, e := range entries {
+58 -12
packages/api/internal/api/readthrough_test.go
··· 67 67 } 68 68 } 69 69 70 - func TestHandleActorFollowingEnqueuesRecords(t *testing.T) { 70 + func TestHandleActorFollowingDoesNotEnqueueRecords(t *testing.T) { 71 71 var upstream *httptest.Server 72 72 upstream = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 73 73 switch { ··· 112 112 if rec.Code != http.StatusOK { 113 113 t.Fatalf("status: got %d body=%s", rec.Code, rec.Body.String()) 114 114 } 115 - if len(st.jobs) != 1 { 116 - t.Fatalf("expected one queued follow job, got %#v", st.jobs) 117 - } 118 - job := st.jobs["did:plc:alice|sh.tangled.graph.follow|1"] 119 - if job == nil { 120 - t.Fatalf("expected follow indexing job, got %#v", st.jobs) 115 + if len(st.jobs) != 0 { 116 + t.Fatalf("expected no queued follow jobs, got %#v", st.jobs) 121 117 } 122 118 } 123 119 124 - func TestHandleActorReposEnqueuesRecords(t *testing.T) { 120 + func TestHandleActorReposDoesNotEnqueueRecords(t *testing.T) { 125 121 var upstream *httptest.Server 126 122 upstream = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 127 123 switch { ··· 170 166 if rec.Code != http.StatusOK { 171 167 t.Fatalf("status: got %d body=%s", rec.Code, rec.Body.String()) 172 168 } 169 + if len(st.jobs) != 0 { 170 + t.Fatalf("expected no queued repo jobs, got %#v", st.jobs) 171 + } 172 + } 173 + 174 + func TestHandleGetActorEnqueuesRecord(t *testing.T) { 175 + var upstream *httptest.Server 176 + upstream = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 177 + switch { 178 + case r.URL.Path == "/xrpc/com.atproto.identity.resolveHandle": 179 + _ = json.NewEncoder(w).Encode(map[string]string{"did": "did:plc:alice"}) 180 + case r.URL.Path == "/did%3Aplc%3Aalice" || r.URL.Path == "/did:plc:alice": 181 + _ = json.NewEncoder(w).Encode(map[string]any{ 182 + "id": "did:plc:alice", 183 + "alsoKnownAs": []string{"at://alice.tangled.org"}, 184 + "service": []map[string]string{{ 185 + "type": "AtprotoPersonalDataServer", "serviceEndpoint": upstream.URL, 186 + }}, 187 + }) 188 + case r.URL.Path == "/xrpc/com.atproto.repo.getRecord": 189 + _ = json.NewEncoder(w).Encode(map[string]any{ 190 + "uri": "at://did:plc:alice/sh.tangled.actor.profile/self", 191 + "cid": "cid-1", 192 + "value": map[string]any{ 193 + "$type": "sh.tangled.actor.profile", 194 + "description": "hello", 195 + }, 196 + }) 197 + default: 198 + http.NotFound(w, r) 199 + } 200 + })) 201 + defer upstream.Close() 202 + 203 + client := xrpc.NewClient( 204 + xrpc.WithHTTPClient(upstream.Client()), 205 + xrpc.WithIdentityService(upstream.URL), 206 + xrpc.WithPLCDirectory(upstream.URL), 207 + ) 208 + st := newAPITestStore() 209 + srv := newAPITestServer(st, client) 210 + mux := http.NewServeMux() 211 + mux.HandleFunc("GET /actors/{handle}", srv.handleGetActor) 212 + 213 + req := httptest.NewRequest(http.MethodGet, "/actors/alice.tangled.org", nil) 214 + rec := httptest.NewRecorder() 215 + mux.ServeHTTP(rec, req) 216 + 217 + if rec.Code != http.StatusOK { 218 + t.Fatalf("status: got %d body=%s", rec.Code, rec.Body.String()) 219 + } 173 220 if len(st.jobs) != 1 { 174 - t.Fatalf("expected one queued repo job, got %#v", st.jobs) 221 + t.Fatalf("expected one queued profile job, got %#v", st.jobs) 175 222 } 176 - job := st.jobs["did:plc:alice|sh.tangled.repo|repo1"] 177 - if job == nil { 178 - t.Fatalf("expected repo indexing job, got %#v", st.jobs) 223 + if st.jobs["did:plc:alice|sh.tangled.actor.profile|self"] == nil { 224 + t.Fatalf("expected profile indexing job, got %#v", st.jobs) 179 225 } 180 226 } 181 227
+187 -58
packages/api/internal/backfill/backfill.go
··· 5 5 "fmt" 6 6 "log/slog" 7 7 "sort" 8 + "strings" 8 9 "sync" 9 10 "time" 10 11 ··· 20 21 21 22 // Runner executes seed resolution, graph discovery, and Tap registration. 22 23 type Runner struct { 23 - store discoveryStore 24 - tap tapAdmin 25 - resolver handleResolver 26 - follows followFetcher 27 - profiles profileFetcher 28 - log *slog.Logger 24 + store discoveryStore 25 + tap tapAdmin 26 + resolver handleResolver 27 + follows followFetcher 28 + profiles profileFetcher 29 + lightrail lightrailRepoLister 30 + log *slog.Logger 29 31 } 30 32 31 33 func NewRunner(store discoveryStore, tap tapAdmin, xrpcClient *xrpc.Client, log *slog.Logger) *Runner { ··· 34 36 NewXRPCHandleResolver(xrpcClient), 35 37 NewXRPCFollowFetcher(xrpcClient), 36 38 NewXRPCProfileFetcher(xrpcClient), 39 + NewHTTPLightrailClient(), 37 40 log, 38 41 ) 39 42 } 40 43 41 - func NewRunnerWithDeps(store discoveryStore, tap tapAdmin, resolver handleResolver, follows followFetcher, profiles profileFetcher, log *slog.Logger) *Runner { 44 + func NewRunnerWithDeps( 45 + store discoveryStore, tap tapAdmin, resolver handleResolver, 46 + follows followFetcher, profiles profileFetcher, lightrail lightrailRepoLister, 47 + log *slog.Logger, 48 + ) *Runner { 42 49 if log == nil { 43 50 log = slog.Default() 44 51 } 45 - return &Runner{store: store, tap: tap, resolver: resolver, follows: follows, profiles: profiles, log: log} 52 + if lightrail == nil { 53 + lightrail = NewHTTPLightrailClient() 54 + } 55 + return &Runner{ 56 + store: store, tap: tap, resolver: resolver, follows: follows, 57 + profiles: profiles, lightrail: lightrail, log: log, 58 + } 46 59 } 47 60 48 61 func (r *Runner) Run(ctx context.Context, opts Options) error { 49 - if opts.SeedsPath == "" { 50 - return fmt.Errorf("--seeds is required") 51 - } 52 - if opts.MaxHops < 0 { 53 - return fmt.Errorf("--max-hops must be >= 0") 54 - } 55 62 if opts.Concurrency <= 0 { 56 63 opts.Concurrency = 5 57 64 } ··· 61 68 if opts.BatchDelay < 0 { 62 69 return fmt.Errorf("--batch-delay must be >= 0") 63 70 } 71 + if opts.PageLimit <= 0 { 72 + opts.PageLimit = DefaultPageLimit 73 + } 74 + if strings.TrimSpace(opts.LightrailURL) == "" { 75 + opts.LightrailURL = DefaultLightrailURL 76 + } 77 + 78 + switch normalizeSource(opts.Source) { 79 + case SourceLightrail: 80 + return r.runLightrail(ctx, opts) 81 + case SourceGraph: 82 + return r.runGraph(ctx, opts) 83 + default: 84 + return fmt.Errorf("unsupported --source %q", opts.Source) 85 + } 86 + } 87 + 88 + func (r *Runner) runGraph(ctx context.Context, opts Options) error { 89 + if opts.SeedsPath == "" { 90 + return fmt.Errorf("--seeds is required for --source graph") 91 + } 92 + if opts.MaxHops < 0 { 93 + return fmt.Errorf("--max-hops must be >= 0") 94 + } 64 95 65 96 seedEntries, err := parseSeedInput(opts.SeedsPath) 66 97 if err != nil { ··· 75 106 } 76 107 77 108 r.log.Info("starting backfill discovery", 109 + slog.String("source", SourceGraph), 78 110 slog.Int("seed_count", len(seeds)), 79 111 slog.Int("max_hops", opts.MaxHops), 80 112 slog.Int("concurrency", opts.Concurrency), ··· 123 155 slog.Int("to_submit", len(toSubmit)), 124 156 ) 125 157 126 - submitted := 0 127 - submitFailures := 0 128 - for i := 0; i < len(toSubmit); i += opts.BatchSize { 129 - end := i + opts.BatchSize 130 - if end > len(toSubmit) { 131 - end = len(toSubmit) 132 - } 133 - batch := toSubmit[i:end] 134 - if err := r.tap.AddRepos(ctx, batch); err != nil { 135 - r.log.Warn("tap batch submission failed", 136 - slog.Int("batch_start", i), 137 - slog.Int("batch_end", end), 138 - slog.Int("batch_size", len(batch)), 139 - slog.String("error", err.Error()), 140 - ) 141 - for _, did := range batch { 142 - if err := r.tap.AddRepos(ctx, []string{did}); err != nil { 143 - submitFailures++ 144 - r.log.Warn("tap repo submission failed", 145 - slog.String("did", did), 146 - slog.String("error", err.Error()), 147 - ) 148 - continue 149 - } 150 - submitted++ 151 - r.log.Info("submitted Tap repo", slog.String("did", did), slog.Int("submitted_total", submitted)) 152 - } 153 - } else { 154 - submitted += len(batch) 155 - r.log.Info("submitted Tap batch", 156 - slog.Int("batch_start", i), 157 - slog.Int("batch_end", end), 158 - slog.Int("batch_size", len(batch)), 159 - slog.Int("submitted_total", submitted), 160 - ) 161 - } 162 - if end < len(toSubmit) && opts.BatchDelay > 0 { 163 - select { 164 - case <-ctx.Done(): 165 - return ctx.Err() 166 - case <-time.After(opts.BatchDelay): 167 - } 168 - } 169 - } 158 + submitted, submitFailures := r.submitDIDs(ctx, toSubmit, opts.BatchSize, opts.BatchDelay) 170 159 171 160 r.log.Info("backfill complete", 161 + slog.String("source", SourceGraph), 172 162 slog.Int("discovered_total", len(discovered)), 173 163 slog.Int("already_tracked", alreadyTracked), 174 164 slog.Int("backfill_in_progress", inProgress), ··· 184 174 return nil 185 175 } 186 176 177 + func (r *Runner) runLightrail(ctx context.Context, opts Options) error { 178 + collections := normalizeCollections(opts.Collections) 179 + if len(collections) == 0 { 180 + collections = append([]string(nil), DefaultCollections...) 181 + } 182 + 183 + r.log.Info("starting backfill discovery", 184 + slog.String("source", SourceLightrail), 185 + slog.String("lightrail_url", opts.LightrailURL), 186 + slog.Int("collection_count", len(collections)), 187 + slog.Int("page_limit", opts.PageLimit), 188 + ) 189 + 190 + dids, err := r.lightrail.ListReposByCollection( 191 + ctx, opts.LightrailURL, collections, opts.PageLimit, 192 + ) 193 + if err != nil { 194 + return fmt.Errorf("discover repos from lightrail: %w", err) 195 + } 196 + dids = normalizeDIDs(dids) 197 + sort.Strings(dids) 198 + discovered := make([]DiscoveredUser, 0, len(dids)) 199 + for _, did := range dids { 200 + discovered = append(discovered, DiscoveredUser{ 201 + DID: did, Hop: 0, Source: strings.Join(collections, ","), Reason: "collection", 202 + }) 203 + } 204 + 205 + r.log.Info("discovery complete", 206 + slog.String("source", SourceLightrail), 207 + slog.Int("discovered_total", len(discovered)), 208 + ) 209 + if opts.DryRun { 210 + r.log.Info("dry-run mode enabled; skipping Tap mutations") 211 + return nil 212 + } 213 + 214 + submitted, submitFailures := r.submitDIDs(ctx, dids, opts.BatchSize, opts.BatchDelay) 215 + r.log.Info("backfill complete", 216 + slog.String("source", SourceLightrail), 217 + slog.Int("discovered_total", len(discovered)), 218 + slog.Int("submitted", submitted), 219 + slog.Int("submit_failures", submitFailures), 220 + ) 221 + return nil 222 + } 223 + 187 224 // resolveSeeds returns (dids, did→handle map, error). The handle map contains 188 225 // entries for seeds that were specified as handles rather than DIDs. 189 226 func (r *Runner) resolveSeeds(ctx context.Context, entries []seedEntry) ([]string, map[string]string, error) { ··· 317 354 } 318 355 319 356 return ordered, nil 357 + } 358 + 359 + func (r *Runner) submitDIDs( 360 + ctx context.Context, dids []string, batchSize int, batchDelay time.Duration, 361 + ) (int, int) { 362 + submitted := 0 363 + submitFailures := 0 364 + for i := 0; i < len(dids); i += batchSize { 365 + end := i + batchSize 366 + if end > len(dids) { 367 + end = len(dids) 368 + } 369 + batch := dids[i:end] 370 + if err := r.tap.AddRepos(ctx, batch); err != nil { 371 + r.log.Warn("tap batch submission failed", 372 + slog.Int("batch_start", i), 373 + slog.Int("batch_end", end), 374 + slog.Int("batch_size", len(batch)), 375 + slog.String("error", err.Error()), 376 + ) 377 + for _, did := range batch { 378 + if err := r.tap.AddRepos(ctx, []string{did}); err != nil { 379 + submitFailures++ 380 + r.log.Warn("tap repo submission failed", 381 + slog.String("did", did), 382 + slog.String("error", err.Error()), 383 + ) 384 + continue 385 + } 386 + submitted++ 387 + r.log.Info("submitted Tap repo", 388 + slog.String("did", did), 389 + slog.Int("submitted_total", submitted), 390 + ) 391 + } 392 + } else { 393 + submitted += len(batch) 394 + r.log.Info("submitted Tap batch", 395 + slog.Int("batch_start", i), 396 + slog.Int("batch_end", end), 397 + slog.Int("batch_size", len(batch)), 398 + slog.Int("submitted_total", submitted), 399 + ) 400 + } 401 + if end < len(dids) && batchDelay > 0 { 402 + select { 403 + case <-ctx.Done(): 404 + return submitted, submitFailures 405 + case <-time.After(batchDelay): 406 + } 407 + } 408 + } 409 + return submitted, submitFailures 410 + } 411 + 412 + func normalizeSource(source string) string { 413 + switch strings.ToLower(strings.TrimSpace(source)) { 414 + case "", SourceLightrail: 415 + return SourceLightrail 416 + case SourceGraph: 417 + return SourceGraph 418 + default: 419 + return strings.ToLower(strings.TrimSpace(source)) 420 + } 421 + } 422 + 423 + func normalizeCollections(collections []string) []string { 424 + seen := make(map[string]bool) 425 + normalized := make([]string, 0, len(collections)) 426 + for _, collection := range collections { 427 + collection = strings.TrimSpace(collection) 428 + if collection == "" || seen[collection] { 429 + continue 430 + } 431 + seen[collection] = true 432 + normalized = append(normalized, collection) 433 + } 434 + return normalized 435 + } 436 + 437 + func normalizeDIDs(dids []string) []string { 438 + seen := make(map[string]bool) 439 + normalized := make([]string, 0, len(dids)) 440 + for _, did := range dids { 441 + did = strings.TrimSpace(did) 442 + if did == "" || seen[did] { 443 + continue 444 + } 445 + seen[did] = true 446 + normalized = append(normalized, did) 447 + } 448 + return normalized 320 449 } 321 450 322 451 // indexProfiles fetches sh.tangled.actor.profile records via XRPC for each
+124 -8
packages/api/internal/backfill/backfill_test.go
··· 95 95 return &ProfileRecord{}, nil 96 96 } 97 97 98 + type fakeLightrailRepoLister struct { 99 + dids []string 100 + err error 101 + calls int 102 + baseURL string 103 + collections []string 104 + limit int 105 + } 106 + 107 + func (f *fakeLightrailRepoLister) ListReposByCollection( 108 + _ context.Context, baseURL string, collections []string, limit int, 109 + ) ([]string, error) { 110 + f.calls++ 111 + f.baseURL = baseURL 112 + f.collections = append([]string(nil), collections...) 113 + f.limit = limit 114 + if f.err != nil { 115 + return nil, f.err 116 + } 117 + return append([]string(nil), f.dids...), nil 118 + } 119 + 98 120 func TestRunner_DiscoveryAndSubmit(t *testing.T) { 99 121 st := &fakeStore{ 100 122 collaborators: map[string][]string{ ··· 105 127 tap := &fakeTapAdmin{statuses: map[string]RepoStatus{"did:plc:f1": {Found: true, Tracked: true, Backfilled: true}}} 106 128 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 107 129 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 108 - r := NewRunnerWithDeps(st, tap, resolver, follows, &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, log) 130 + r := NewRunnerWithDeps( 131 + st, tap, resolver, follows, 132 + &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, 133 + &fakeLightrailRepoLister{}, log, 134 + ) 109 135 110 136 dir := t.TempDir() 111 137 seedsPath := filepath.Join(dir, "seeds.txt") ··· 118 144 MaxHops: 1, 119 145 Concurrency: 2, 120 146 BatchSize: 2, 147 + Source: SourceGraph, 121 148 }) 122 149 if err != nil { 123 150 t.Fatalf("run backfill: %v", err) ··· 137 164 tap := &fakeTapAdmin{statuses: map[string]RepoStatus{}} 138 165 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 139 166 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 140 - r := NewRunnerWithDeps(st, tap, resolver, follows, &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, log) 167 + r := NewRunnerWithDeps( 168 + st, tap, resolver, follows, 169 + &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, 170 + &fakeLightrailRepoLister{}, log, 171 + ) 141 172 142 173 dir := t.TempDir() 143 174 seedsPath := filepath.Join(dir, "seeds.txt") ··· 151 182 DryRun: true, 152 183 Concurrency: 1, 153 184 BatchSize: 10, 185 + Source: SourceGraph, 154 186 }) 155 187 if err != nil { 156 188 t.Fatalf("run dry-run backfill: %v", err) ··· 168 200 }} 169 201 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 170 202 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 171 - r := NewRunnerWithDeps(st, tap, resolver, follows, &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, log) 203 + r := NewRunnerWithDeps( 204 + st, tap, resolver, follows, 205 + &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, 206 + &fakeLightrailRepoLister{}, log, 207 + ) 172 208 173 209 dir := t.TempDir() 174 210 seedsPath := filepath.Join(dir, "seeds.txt") ··· 176 212 t.Fatalf("write seeds: %v", err) 177 213 } 178 214 179 - err := r.Run(context.Background(), Options{SeedsPath: seedsPath, MaxHops: 0}) 215 + err := r.Run(context.Background(), Options{ 216 + SeedsPath: seedsPath, MaxHops: 0, Source: SourceGraph, 217 + }) 180 218 if err != nil { 181 219 t.Fatalf("run backfill: %v", err) 182 220 } ··· 198 236 } 199 237 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 200 238 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 201 - r := NewRunnerWithDeps(st, tap, resolver, follows, &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, log) 239 + r := NewRunnerWithDeps( 240 + st, tap, resolver, follows, 241 + &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, 242 + &fakeLightrailRepoLister{}, log, 243 + ) 202 244 203 245 dir := t.TempDir() 204 246 seedsPath := filepath.Join(dir, "seeds.txt") ··· 211 253 MaxHops: 1, 212 254 Concurrency: 1, 213 255 BatchSize: 10, 256 + Source: SourceGraph, 214 257 }) 215 258 if err != nil { 216 259 t.Fatalf("run backfill: %v", err) ··· 250 293 } 251 294 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 252 295 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 253 - r := NewRunnerWithDeps(st, tap, resolver, follows, &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, log) 296 + r := NewRunnerWithDeps( 297 + st, tap, resolver, follows, 298 + &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, 299 + &fakeLightrailRepoLister{}, log, 300 + ) 254 301 255 302 dir := t.TempDir() 256 303 seedsPath := filepath.Join(dir, "seeds.txt") ··· 263 310 MaxHops: 1, 264 311 Concurrency: 1, 265 312 BatchSize: 10, 313 + Source: SourceGraph, 266 314 }) 267 315 if err != nil { 268 316 t.Fatalf("run backfill: %v", err) ··· 297 345 }, 298 346 }} 299 347 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 300 - r := NewRunnerWithDeps(st, tap, resolver, follows, profiles, log) 348 + r := NewRunnerWithDeps( 349 + st, tap, resolver, follows, profiles, &fakeLightrailRepoLister{}, log, 350 + ) 301 351 302 352 dir := t.TempDir() 303 353 seedsPath := filepath.Join(dir, "seeds.txt") ··· 305 355 t.Fatalf("write seeds: %v", err) 306 356 } 307 357 308 - err := r.Run(context.Background(), Options{SeedsPath: seedsPath, MaxHops: 0}) 358 + err := r.Run(context.Background(), Options{ 359 + SeedsPath: seedsPath, MaxHops: 0, Source: SourceGraph, 360 + }) 309 361 if err != nil { 310 362 t.Fatalf("run backfill: %v", err) 311 363 } ··· 334 386 t.Errorf("expected summary to contain location, got %q", doc.Summary) 335 387 } 336 388 } 389 + 390 + func TestRunner_LightrailDryRunSkipsTapAndProfileIndexing(t *testing.T) { 391 + st := &fakeStore{collaborators: map[string][]string{}} 392 + tap := &fakeTapAdmin{ 393 + statusErrs: map[string]error{"did:plc:a": errors.New("should not be called")}, 394 + } 395 + lightrail := &fakeLightrailRepoLister{dids: []string{"did:plc:b", "did:plc:a"}} 396 + log := slog.New(slog.NewTextHandler(io.Discard, nil)) 397 + r := NewRunnerWithDeps( 398 + st, tap, &fakeResolver{}, &fakeFollowFetcher{}, &fakeProfileFetcher{}, 399 + lightrail, log, 400 + ) 401 + 402 + err := r.Run(context.Background(), Options{ 403 + Source: SourceLightrail, DryRun: true, 404 + LightrailURL: "https://example.test", PageLimit: 500, 405 + }) 406 + if err != nil { 407 + t.Fatalf("run lightrail dry-run: %v", err) 408 + } 409 + if len(tap.added) != 0 { 410 + t.Fatalf("expected no Tap submissions, got %#v", tap.added) 411 + } 412 + if len(st.documents) != 0 { 413 + t.Fatalf("expected no profile indexing, got %#v", st.documents) 414 + } 415 + if lightrail.calls != 1 { 416 + t.Fatalf("expected one Lightrail call, got %d", lightrail.calls) 417 + } 418 + if len(lightrail.collections) != len(DefaultCollections) { 419 + t.Fatalf("expected default collections, got %#v", lightrail.collections) 420 + } 421 + } 422 + 423 + func TestRunner_LightrailSubmitsWithoutRepoStatusChecks(t *testing.T) { 424 + st := &fakeStore{collaborators: map[string][]string{}} 425 + tap := &fakeTapAdmin{ 426 + statusErrs: map[string]error{ 427 + "did:plc:a": errors.New("RepoStatus should not be called in lightrail mode"), 428 + }, 429 + } 430 + lightrail := &fakeLightrailRepoLister{ 431 + dids: []string{"did:plc:b", "did:plc:a", "did:plc:b"}, 432 + } 433 + log := slog.New(slog.NewTextHandler(io.Discard, nil)) 434 + r := NewRunnerWithDeps( 435 + st, tap, &fakeResolver{}, &fakeFollowFetcher{}, &fakeProfileFetcher{}, 436 + lightrail, log, 437 + ) 438 + 439 + err := r.Run(context.Background(), Options{ 440 + Source: SourceLightrail, BatchSize: 10, 441 + Collections: []string{"sh.tangled.repo"}, 442 + }) 443 + if err != nil { 444 + t.Fatalf("run lightrail backfill: %v", err) 445 + } 446 + if len(tap.added) != 1 { 447 + t.Fatalf("expected one Tap batch, got %#v", tap.added) 448 + } 449 + if len(tap.added[0]) != 2 { 450 + t.Fatalf("expected deduped DIDs, got %#v", tap.added) 451 + } 452 + }
+18 -15
packages/api/internal/backfill/doc.go
··· 1 - // Package backfill provides graph bootstrap tooling for Twister. 1 + // Package backfill provides Tap bootstrap tooling for Twister. 2 2 // 3 3 // # Backfill Runbook 4 4 // 5 - // This runbook covers initial graph bootstrap and repeat runs using: 5 + // This runbook covers initial bootstrap and repeat runs using: 6 6 // 7 7 // twister backfill 8 8 // 9 - // # Seeds Input 9 + // `--source lightrail` is the default and discovers DIDs from 10 + // com.atproto.sync.listReposByCollection. `--source graph` keeps the older 11 + // handle/DID seed crawl for targeted fallback runs. 10 12 // 11 - // The `--seeds` flag supports either of these forms: 13 + // # Graph Seeds Input 14 + // 15 + // The `--seeds` flag applies only to `--source graph` and supports either of 16 + // these forms: 12 17 // 13 18 // 1. File path: 14 19 // ··· 39 44 // 40 45 // # First Bootstrap 41 46 // 42 - // 1. Copy and customize seeds: 47 + // 1. Run full-network dry-run: 43 48 // 44 - // cp docs/api/seeds.txt /tmp/twister-seeds.txt 49 + // twister backfill --dry-run 45 50 // 46 - // 2. Run dry-run first: 51 + // 2. Run real bootstrap: 47 52 // 48 - // twister backfill --seeds /tmp/twister-seeds.txt --max-hops 2 --dry-run 53 + // twister backfill 49 54 // 50 - // 3. Run real backfill: 55 + // 3. Use graph mode only for targeted fallback: 51 56 // 52 - // twister backfill --seeds /tmp/twister-seeds.txt --max-hops 2 --concurrency 5 --batch-size 10 --batch-delay 1s 57 + // twister backfill --source graph --seeds /tmp/twister-seeds.txt --max-hops 2 53 58 // 54 - // Watch logs for seed count, hop-level discoveries, already-tracked vs submitted 55 - // users, and batch progress totals. 59 + // Watch logs for discovery totals and Tap submission progress. 56 60 // 57 61 // # Repeat Run 58 62 // 59 - // Append new candidate users to the seed source, run dry-run, then run the real 60 - // command again. Reruns are safe because discovery deduplicates in-memory and 61 - // Tap /repos/add is treated as idempotent. 63 + // Re-run `twister backfill` whenever you need to reseed the authoritative Tap 64 + // corpus. Append graph seeds only when using `--source graph`. 62 65 // 63 66 // # Dry-Run Safety 64 67 //
+124
packages/api/internal/backfill/lightrail.go
··· 1 + package backfill 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "io" 8 + "net/http" 9 + "net/url" 10 + "strings" 11 + "time" 12 + ) 13 + 14 + const listReposByCollectionNSID = "com.atproto.sync.listReposByCollection" 15 + 16 + type lightrailRepoLister interface { 17 + ListReposByCollection( 18 + ctx context.Context, baseURL string, collections []string, limit int, 19 + ) ([]string, error) 20 + } 21 + 22 + type listReposByCollectionResponse struct { 23 + Cursor string `json:"cursor"` 24 + Repos []listReposByCollection `json:"repos"` 25 + } 26 + 27 + type listReposByCollection struct { 28 + DID string `json:"did"` 29 + } 30 + 31 + type HTTPLightrailClient struct { 32 + client *http.Client 33 + } 34 + 35 + func NewHTTPLightrailClient() *HTTPLightrailClient { 36 + return &HTTPLightrailClient{ 37 + client: &http.Client{Timeout: 15 * time.Second}, 38 + } 39 + } 40 + 41 + func (c *HTTPLightrailClient) ListReposByCollection( 42 + ctx context.Context, baseURL string, collections []string, limit int, 43 + ) ([]string, error) { 44 + baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/") 45 + if baseURL == "" { 46 + return nil, fmt.Errorf("lightrail url is required") 47 + } 48 + if limit <= 0 { 49 + limit = DefaultPageLimit 50 + } 51 + 52 + seen := make(map[string]bool) 53 + dids := make([]string, 0) 54 + cursor := "" 55 + for { 56 + resp, err := c.listReposByCollectionPage(ctx, baseURL, collections, limit, cursor) 57 + if err != nil { 58 + return nil, err 59 + } 60 + for _, repo := range resp.Repos { 61 + did := strings.TrimSpace(repo.DID) 62 + if did == "" || seen[did] { 63 + continue 64 + } 65 + seen[did] = true 66 + dids = append(dids, did) 67 + } 68 + if resp.Cursor == "" { 69 + return dids, nil 70 + } 71 + if resp.Cursor == cursor { 72 + return nil, fmt.Errorf("listReposByCollection repeated cursor %q", cursor) 73 + } 74 + cursor = resp.Cursor 75 + } 76 + } 77 + 78 + func (c *HTTPLightrailClient) listReposByCollectionPage( 79 + ctx context.Context, baseURL string, collections []string, limit int, cursor string, 80 + ) (*listReposByCollectionResponse, error) { 81 + params := url.Values{} 82 + for _, collection := range collections { 83 + collection = strings.TrimSpace(collection) 84 + if collection != "" { 85 + params.Add("collection", collection) 86 + } 87 + } 88 + if limit > 0 { 89 + params.Set("limit", fmt.Sprintf("%d", limit)) 90 + } 91 + if cursor != "" { 92 + params.Set("cursor", cursor) 93 + } 94 + 95 + endpoint := baseURL + "/xrpc/" + listReposByCollectionNSID 96 + if encoded := params.Encode(); encoded != "" { 97 + endpoint += "?" + encoded 98 + } 99 + req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) 100 + if err != nil { 101 + return nil, fmt.Errorf("build listReposByCollection request: %w", err) 102 + } 103 + 104 + resp, err := c.client.Do(req) 105 + if err != nil { 106 + return nil, fmt.Errorf("listReposByCollection request: %w", err) 107 + } 108 + defer resp.Body.Close() 109 + 110 + if resp.StatusCode < 200 || resp.StatusCode >= 300 { 111 + body, _ := io.ReadAll(io.LimitReader(resp.Body, 512)) 112 + return nil, fmt.Errorf( 113 + "listReposByCollection failed: status %d: %s", 114 + resp.StatusCode, 115 + strings.TrimSpace(string(body)), 116 + ) 117 + } 118 + 119 + var payload listReposByCollectionResponse 120 + if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil { 121 + return nil, fmt.Errorf("decode listReposByCollection response: %w", err) 122 + } 123 + return &payload, nil 124 + }
+123
packages/api/internal/backfill/lightrail_test.go
··· 1 + package backfill 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "net/http" 7 + "net/http/httptest" 8 + "net/url" 9 + "reflect" 10 + "testing" 11 + ) 12 + 13 + func TestHTTPLightrailClientListReposByCollectionSinglePage(t *testing.T) { 14 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 15 + if r.URL.Path != "/xrpc/"+listReposByCollectionNSID { 16 + http.NotFound(w, r) 17 + return 18 + } 19 + _ = json.NewEncoder(w).Encode(map[string]any{ 20 + "repos": []map[string]string{ 21 + {"did": "did:plc:a"}, 22 + {"did": "did:plc:b"}, 23 + }, 24 + }) 25 + })) 26 + defer srv.Close() 27 + 28 + client := NewHTTPLightrailClient() 29 + dids, err := client.ListReposByCollection( 30 + context.Background(), srv.URL, []string{"sh.tangled.repo"}, 100, 31 + ) 32 + if err != nil { 33 + t.Fatalf("list repos: %v", err) 34 + } 35 + 36 + want := []string{"did:plc:a", "did:plc:b"} 37 + if !reflect.DeepEqual(dids, want) { 38 + t.Fatalf("dids: got %#v want %#v", dids, want) 39 + } 40 + } 41 + 42 + func TestHTTPLightrailClientListReposByCollectionPaginatesAndDedupes(t *testing.T) { 43 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 44 + query := r.URL.Query() 45 + cursor := query.Get("cursor") 46 + switch cursor { 47 + case "": 48 + _ = json.NewEncoder(w).Encode(map[string]any{ 49 + "cursor": "page-2", 50 + "repos": []map[string]string{ 51 + {"did": "did:plc:a"}, 52 + {"did": "did:plc:b"}, 53 + }, 54 + }) 55 + case "page-2": 56 + _ = json.NewEncoder(w).Encode(map[string]any{ 57 + "repos": []map[string]string{ 58 + {"did": "did:plc:b"}, 59 + {"did": "did:plc:c"}, 60 + }, 61 + }) 62 + default: 63 + t.Fatalf("unexpected cursor %q", cursor) 64 + } 65 + })) 66 + defer srv.Close() 67 + 68 + client := NewHTTPLightrailClient() 69 + dids, err := client.ListReposByCollection( 70 + context.Background(), srv.URL, []string{"sh.tangled.repo"}, 2, 71 + ) 72 + if err != nil { 73 + t.Fatalf("list repos: %v", err) 74 + } 75 + 76 + want := []string{"did:plc:a", "did:plc:b", "did:plc:c"} 77 + if !reflect.DeepEqual(dids, want) { 78 + t.Fatalf("dids: got %#v want %#v", dids, want) 79 + } 80 + } 81 + 82 + func TestHTTPLightrailClientListReposByCollectionAddsRepeatedCollections(t *testing.T) { 83 + var got url.Values 84 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 85 + got = r.URL.Query() 86 + _ = json.NewEncoder(w).Encode(map[string]any{"repos": []map[string]string{}}) 87 + })) 88 + defer srv.Close() 89 + 90 + client := NewHTTPLightrailClient() 91 + if _, err := client.ListReposByCollection( 92 + context.Background(), 93 + srv.URL, 94 + []string{"sh.tangled.actor.profile", "sh.tangled.repo"}, 95 + 50, 96 + ); err != nil { 97 + t.Fatalf("list repos: %v", err) 98 + } 99 + 100 + if got.Get("limit") != "50" { 101 + t.Fatalf("limit: got %q", got.Get("limit")) 102 + } 103 + if values := got["collection"]; !reflect.DeepEqual(values, []string{ 104 + "sh.tangled.actor.profile", 105 + "sh.tangled.repo", 106 + }) { 107 + t.Fatalf("collections: got %#v", values) 108 + } 109 + } 110 + 111 + func TestHTTPLightrailClientListReposByCollectionErrorsOnNonSuccess(t *testing.T) { 112 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { 113 + http.Error(w, `{"message":"boom"}`, http.StatusBadGateway) 114 + })) 115 + defer srv.Close() 116 + 117 + client := NewHTTPLightrailClient() 118 + if _, err := client.ListReposByCollection( 119 + context.Background(), srv.URL, []string{"sh.tangled.repo"}, 100, 120 + ); err == nil { 121 + t.Fatal("expected error") 122 + } 123 + }
+29 -6
packages/api/internal/backfill/types.go
··· 2 2 3 3 import "time" 4 4 5 + const ( 6 + SourceLightrail = "lightrail" 7 + SourceGraph = "graph" 8 + DefaultLightrailURL = "https://lightrail.microcosm.blue" 9 + DefaultPageLimit = 10000 10 + ) 11 + 12 + var DefaultCollections = []string{ 13 + "sh.tangled.repo", 14 + "sh.tangled.repo.issue", 15 + "sh.tangled.repo.issue.state", 16 + "sh.tangled.repo.issue.comment", 17 + "sh.tangled.repo.pull", 18 + "sh.tangled.repo.pull.status", 19 + "sh.tangled.repo.pull.comment", 20 + "sh.tangled.string", 21 + "sh.tangled.actor.profile", 22 + } 23 + 5 24 // Options configures a backfill run. 6 25 type Options struct { 7 - SeedsPath string 8 - MaxHops int 9 - DryRun bool 10 - Concurrency int 11 - BatchSize int 12 - BatchDelay time.Duration 26 + SeedsPath string 27 + MaxHops int 28 + DryRun bool 29 + Concurrency int 30 + BatchSize int 31 + BatchDelay time.Duration 32 + Source string 33 + LightrailURL string 34 + Collections []string 35 + PageLimit int 13 36 } 14 37 15 38 // DiscoveredUser contains crawl metadata for an included DID.
+39 -28
packages/api/internal/view/static/search.js
··· 1 1 function searchApp() { 2 2 const TANGLED_BASE = "https://tangled.org"; 3 + const PDS_BASE = "https://pds.ls"; 4 + const DOCUMENTS_BASE = "/documents"; 3 5 4 6 return { 5 7 query: "", ··· 89 91 this.doSearch(false); 90 92 }, 91 93 92 - resultMode(r) { 93 - return this.resolveResult(r).mode; 94 - }, 95 - 96 94 resultURL(r) { 97 95 return this.resolveResult(r).url; 98 96 }, ··· 101 99 return this.resolveResult(r).warning; 102 100 }, 103 101 102 + jsonURL(r) { 103 + return DOCUMENTS_BASE + "/" + encodeURIComponent(r.id); 104 + }, 105 + 106 + pdsURL(r) { 107 + return r.at_uri ? PDS_BASE + "/" + r.at_uri : ""; 108 + }, 109 + 104 110 resolveResult(r) { 105 111 const parsed = this.parseATURI(r.at_uri); 106 112 const author = this.normalizeOwner(r.author_handle) || this.normalizeSegment(r.did) || parsed.did; 107 113 const repoOwner = this.normalizeOwner(r.repo_owner_handle) || author; 108 114 const repoName = this.normalizeSegment(r.repo_name); 109 115 110 - if (r.record_type === "issue") { 111 - if (!r.at_uri) { 112 - return { 113 - mode: "none", 114 - url: "", 115 - warning: "This issue is missing its AT URI, so Twister cannot copy or link it yet.", 116 - }; 117 - } 118 - return { mode: "copy", url: "", warning: "" }; 119 - } 120 - 121 116 if (r.record_type === "string") { 122 117 const owner = author || parsed.did; 123 118 const rkey = parsed.rkey; 124 119 const url = r.web_url || (owner && rkey ? this.buildTangledURL("strings", owner, rkey) : ""); 125 120 const warning = url ? "" : "This string is indexed from AT Protocol, but Tangled no longer has a page for it."; 126 - return { mode: url ? "link" : "none", url, warning }; 127 - } 128 - 129 - if (r.web_url) { 130 - return { mode: "link", url: r.web_url, warning: "" }; 121 + return { url, warning }; 131 122 } 132 123 133 124 let url = ""; 134 125 switch (r.record_type) { 135 126 case "profile": 136 - url = author ? this.buildTangledURL(author) : ""; 127 + url = r.web_url || (author ? this.buildTangledURL(author) : ""); 137 128 break; 138 129 case "repo": 139 - url = repoOwner && repoName ? this.buildTangledURL(repoOwner, repoName) : ""; 130 + url = r.web_url || (repoOwner && repoName ? this.buildTangledURL(repoOwner, repoName) : ""); 140 131 break; 132 + case "issue": 141 133 case "issue_comment": 142 134 url = repoOwner && repoName ? this.buildTangledURL(repoOwner, repoName, "issues") : ""; 143 135 break; ··· 145 137 case "pull_comment": 146 138 url = repoOwner && repoName ? this.buildTangledURL(repoOwner, repoName, "pulls") : ""; 147 139 break; 140 + default: 141 + url = r.web_url || ""; 148 142 } 149 143 150 144 return url 151 - ? { mode: "link", url, warning: "" } 145 + ? { url, warning: "" } 152 146 : { 153 - mode: "none", 154 147 url: "", 155 148 warning: "This record is indexed from AT Protocol, but Tangled does not currently expose a page for it.", 156 149 }; 157 150 }, 158 151 159 - async copyIssueATURI(r) { 152 + async copyATURI(r) { 153 + const label = this.recordLabel(r); 160 154 if (!r.at_uri) { 161 - this.showToast("Issue AT URI is unavailable."); 155 + this.showToast(label + " AT URI is unavailable."); 162 156 return; 163 157 } 164 158 165 159 try { 166 160 await this.writeClipboard(r.at_uri); 167 - this.showToast("Issue AT URI copied."); 161 + this.showToast(label + " AT URI copied."); 168 162 } catch (_) { 169 - this.showToast("Could not copy the issue AT URI."); 163 + this.showToast("Could not copy the " + label.toLowerCase() + " AT URI."); 164 + } 165 + }, 166 + 167 + recordLabel(r) { 168 + switch (r.record_type) { 169 + case "profile": 170 + return "User"; 171 + case "repo": 172 + return "Repo"; 173 + case "issue": 174 + return "Issue"; 175 + case "pull": 176 + return "Pull"; 177 + default: { 178 + const label = (r.record_type || "record").replace(/_/g, " "); 179 + return label.charAt(0).toUpperCase() + label.slice(1); 180 + } 170 181 } 171 182 }, 172 183
+17 -8
packages/api/internal/view/static/style.css
··· 110 110 font-size: .9rem; 111 111 cursor: pointer; 112 112 } 113 - .btn:hover { border-color: var(--accent); color: var(--accent); } 113 + .btn:hover { border-color: var(--accent); color: var(--accent); text-decoration: none; } 114 114 .btn:disabled { opacity: .5; cursor: default; } 115 115 .btn-primary { background: var(--accent); color: var(--bg); border-color: var(--accent); font-weight: 500; } 116 116 .btn-primary:hover { background: #6b93e8; color: var(--bg); } ··· 137 137 /* Messages */ 138 138 .msg { padding: 1rem; color: var(--text-dim); text-align: center; } 139 139 .msg-error { color: #f7768e; } 140 + .msg-empty { 141 + background: var(--surface); 142 + border: 1px solid var(--border); 143 + border-radius: var(--radius); 144 + } 145 + .msg-empty p:last-child { margin-bottom: 0; } 140 146 141 147 /* Result cards */ 142 148 .card { ··· 151 157 min-width: 0; 152 158 } 153 159 .card:hover { border-color: var(--accent); text-decoration: none; } 154 - .card-button { 155 - width: 100%; 156 - text-align: left; 157 - font: inherit; 158 - cursor: pointer; 159 - } 160 - .card-disabled:hover { border-color: var(--border); } 161 160 .card-head { 162 161 display: flex; 163 162 align-items: flex-start; ··· 209 208 min-width: 0; 210 209 overflow-wrap: anywhere; 211 210 word-break: break-word; 211 + } 212 + .card-actions { 213 + display: flex; 214 + gap: .5rem; 215 + flex-wrap: wrap; 216 + margin-top: .75rem; 217 + } 218 + .btn-card { 219 + padding: .45rem .7rem; 220 + font-size: .82rem; 212 221 } 213 222 .meta-sep::before { content: "\00b7"; margin-right: .5rem; } 214 223 .card-warning {
+68 -55
packages/api/internal/view/templates/index.html
··· 35 35 <template x-if="error"> 36 36 <div class="msg msg-error" x-text="error"></div> 37 37 </template> 38 + <template x-if="!error && !searched && !loading"> 39 + <div class="msg msg-empty"> 40 + <p>Search indexed Tangled records.</p> 41 + <p> 42 + If a record is missing, fetch it through the API to index it for search. 43 + Try 44 + <a href="/actors/desertthunder.dev" target="_blank" rel="noopener"> 45 + <code>GET /actors/desertthunder.dev</code> 46 + </a>. 47 + </p> 48 + </div> 49 + </template> 38 50 <template x-if="!error && searched && results.length === 0"> 39 - <div class="msg">No results found.</div> 51 + <div class="msg msg-empty"> 52 + <p>No results found.</p> 53 + <p> 54 + Fetching a resource through the API will index it for search. 55 + Try 56 + <a href="/actors/desertthunder.dev" target="_blank" rel="noopener"> 57 + <code>GET /actors/desertthunder.dev</code> 58 + </a> 59 + and search again. 60 + </p> 61 + </div> 40 62 </template> 41 63 <template x-for="r in results" :key="r.id"> 42 64 <div class="result-shell"> 43 - <template x-if="resultMode(r) === 'link'"> 44 - <a :href="resultURL(r)" target="_blank" rel="noopener" class="card"> 45 - <div class="card-head"> 46 - <span class="badge" x-text="r.record_type"></span> 47 - <span class="card-title" x-text="r.title || r.id"></span> 48 - </div> 49 - <div class="card-snippet" x-show="r.body_snippet" x-html="r.body_snippet"></div> 50 - <div class="card-meta"> 51 - <span x-show="r.author_handle" x-text="r.author_handle"></span> 52 - <span x-show="r.repo_name" class="meta-sep" x-text="r.repo_name"></span> 53 - <span x-show="r.updated_at" class="meta-sep" x-text="relTime(r.updated_at)"></span> 54 - </div> 55 - <div x-show="warningMessage(r)" class="card-warning" role="note"> 56 - <strong class="warning-title" x-text="warningMessage(r)"></strong> 57 - <code x-show="r.at_uri" class="warning-uri" x-text="r.at_uri"></code> 58 - </div> 59 - </a> 60 - </template> 61 - <template x-if="resultMode(r) === 'copy'"> 62 - <button type="button" class="card card-button" @click="copyIssueATURI(r)"> 63 - <div class="card-head"> 64 - <span class="badge" x-text="r.record_type"></span> 65 - <span class="card-title" x-text="r.title || r.id"></span> 66 - </div> 67 - <div class="card-snippet" x-show="r.body_snippet" x-html="r.body_snippet"></div> 68 - <div class="card-meta"> 69 - <span x-show="r.author_handle" x-text="r.author_handle"></span> 70 - <span x-show="r.repo_name" class="meta-sep" x-text="r.repo_name"></span> 71 - <span x-show="r.updated_at" class="meta-sep" x-text="relTime(r.updated_at)"></span> 72 - </div> 73 - <div class="card-warning" role="note"> 74 - <strong class="warning-title">Click to copy the issue AT URI.</strong> 75 - <code x-show="r.at_uri" class="warning-uri" x-text="r.at_uri"></code> 76 - </div> 77 - </button> 78 - </template> 79 - <template x-if="resultMode(r) === 'none'"> 80 - <article class="card card-disabled"> 81 - <div class="card-head"> 82 - <span class="badge" x-text="r.record_type"></span> 83 - <span class="card-title" x-text="r.title || r.id"></span> 84 - </div> 85 - <div class="card-snippet" x-show="r.body_snippet" x-html="r.body_snippet"></div> 86 - <div class="card-meta"> 87 - <span x-show="r.author_handle" x-text="r.author_handle"></span> 88 - <span x-show="r.repo_name" class="meta-sep" x-text="r.repo_name"></span> 89 - <span x-show="r.updated_at" class="meta-sep" x-text="relTime(r.updated_at)"></span> 90 - </div> 91 - <div x-show="warningMessage(r)" class="card-warning" role="note"> 92 - <strong class="warning-title" x-text="warningMessage(r)"></strong> 93 - <code x-show="r.at_uri" class="warning-uri" x-text="r.at_uri"></code> 94 - </div> 95 - </article> 96 - </template> 65 + <article class="card"> 66 + <div class="card-head"> 67 + <span class="badge" x-text="r.record_type"></span> 68 + <span class="card-title" x-text="r.title || r.id"></span> 69 + </div> 70 + <div class="card-snippet" x-show="r.body_snippet" x-html="r.body_snippet"></div> 71 + <div class="card-snippet" x-show="!r.body_snippet && r.summary" x-text="r.summary"></div> 72 + <div class="card-meta"> 73 + <span x-show="r.author_handle" x-text="r.author_handle"></span> 74 + <span x-show="r.repo_name" class="meta-sep" x-text="r.repo_name"></span> 75 + <span x-show="r.updated_at" class="meta-sep" x-text="relTime(r.updated_at)"></span> 76 + </div> 77 + <div class="card-actions"> 78 + <a 79 + x-show="resultURL(r)" 80 + :href="resultURL(r)" 81 + target="_blank" 82 + rel="noopener" 83 + class="btn btn-card" 84 + >Open on Tangled</a> 85 + <button 86 + x-show="r.at_uri" 87 + type="button" 88 + class="btn btn-card" 89 + @click="copyATURI(r)" 90 + >Copy AT URI</button> 91 + <a 92 + x-show="r.at_uri" 93 + :href="pdsURL(r)" 94 + target="_blank" 95 + rel="noopener" 96 + class="btn btn-card" 97 + >Open in pds.ls</a> 98 + <a 99 + :href="jsonURL(r)" 100 + target="_blank" 101 + rel="noopener" 102 + class="btn btn-card" 103 + >View JSON</a> 104 + </div> 105 + <div x-show="warningMessage(r)" class="card-warning" role="note"> 106 + <strong class="warning-title" x-text="warningMessage(r)"></strong> 107 + <code x-show="r.at_uri" class="warning-uri" x-text="r.at_uri"></code> 108 + </div> 109 + </article> 97 110 </div> 98 111 </template> 99 112 <template x-if="hasMore">
+3
packages/api/internal/view/view_test.go
··· 21 21 if !strings.Contains(body, "Search Tangled") { 22 22 t.Fatalf("expected search home content, got body %q", body) 23 23 } 24 + if !strings.Contains(body, "GET /actors/desertthunder.dev") { 25 + t.Fatalf("expected search empty state example, got body %q", body) 26 + } 24 27 if strings.Contains(body, "Health Endpoints") { 25 28 t.Fatalf("expected search home page, got health page content") 26 29 }
+7 -4
packages/api/main.go
··· 212 212 213 213 cmd := &cobra.Command{ 214 214 Use: "backfill", 215 - Short: "Discover users from seeds and register repos for Tap backfill", 215 + Short: "Discover repos and register them with Tap backfill", 216 216 RunE: func(cmd *cobra.Command, args []string) error { 217 217 cfg, err := config.Load(config.LoadOptions{Local: *local}) 218 218 if err != nil { ··· 265 265 }, 266 266 } 267 267 268 - cmd.Flags().StringVar(&opts.SeedsPath, "seeds", "", "Seed source: file path or comma-separated DIDs/handles (required)") 269 - cmd.Flags().IntVar(&opts.MaxHops, "max-hops", 2, "Max fan-out depth from seeds") 268 + cmd.Flags().StringVar(&opts.Source, "source", backfill.SourceLightrail, "Discovery source: lightrail or graph") 269 + cmd.Flags().StringVar(&opts.LightrailURL, "lightrail-url", backfill.DefaultLightrailURL, "Base URL for listReposByCollection discovery") 270 + cmd.Flags().StringArrayVar(&opts.Collections, "collection", nil, "Collection to discover via Lightrail (repeatable)") 271 + cmd.Flags().IntVar(&opts.PageLimit, "page-limit", backfill.DefaultPageLimit, "Max DIDs to request per Lightrail page") 272 + cmd.Flags().StringVar(&opts.SeedsPath, "seeds", "", "Seed source for --source graph: file path or comma-separated DIDs/handles") 273 + cmd.Flags().IntVar(&opts.MaxHops, "max-hops", 2, "Max fan-out depth from graph seeds") 270 274 cmd.Flags().BoolVar(&opts.DryRun, "dry-run", false, "Print discovery plan without mutating Tap") 271 275 cmd.Flags().IntVar(&opts.Concurrency, "concurrency", 5, "Parallel discovery workers") 272 276 cmd.Flags().IntVar(&opts.BatchSize, "batch-size", 10, "DIDs per /repos/add request") 273 277 cmd.Flags().DurationVar(&opts.BatchDelay, "batch-delay", time.Second, "Delay between Tap /repos/add batches") 274 - _ = cmd.MarkFlagRequired("seeds") 275 278 276 279 return cmd 277 280 }