a love letter to tangled (android, iOS, and a search API)
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: backfill

+1626 -16
+9
docs/api/seeds.txt
··· 1 + # Example seed handles for Twister graph backfill 2 + # One DID or handle per line. Comments and blank lines are ignored. 3 + 4 + anirudh.fi 5 + atprotocol.dev 6 + zzstoatzz.io 7 + oppi.li 8 + desertthunder.dev 9 + tangled.org
+1 -1
docs/api/specs/07-graph-backfill.md
··· 96 96 97 97 | Flag | Default | Description | 98 98 | --------------- | -------- | ----------------------------------------------- | 99 - | `--seeds` | required | Path to seed file | 99 + | `--seeds` | required | Seed source: file path or comma-separated list | 100 100 | `--max-hops` | `2` | Max fan-out depth from seed users | 101 101 | `--dry-run` | `false` | List discovered users without submitting to Tap | 102 102 | `--concurrency` | `5` | Parallel discovery workers |
+11 -9
docs/api/tasks/phase-1-mvp.md
··· 134 134 135 135 ### Tasks 136 136 137 - - [ ] Implement `backfill` subcommand with flags: 137 + - [x] Implement `backfill` subcommand with flags: 138 138 - `--seeds <file>` — required seed file path 139 139 - `--max-hops <n>` — depth limit for fan-out (default: 2) 140 140 - `--dry-run` — print the discovery plan without mutating Tap 141 141 - `--concurrency <n>` — parallel discovery workers (default: 5) 142 142 - `--batch-size <n>` — DIDs per `/repos/add` request 143 143 - `--batch-delay <duration>` — delay between Tap registration batches 144 - - [ ] Implement seed file parsing: 144 + - [x] Implement seed file parsing: 145 145 - One DID or handle per line 146 146 - `#` comments allowed 147 147 - Blank lines ignored 148 148 - Handles resolved to DIDs before graph expansion 149 - - [ ] Decide and document the initial seed file location for operators: 149 + - [x] Decide and document the initial seed file location for operators: 150 150 - Repository-managed example file for format/reference 151 151 - Deployment-specific runtime file or mounted secret for real runs 152 - - [ ] Implement graph discovery: 152 + - Implemented: `docs/api/seeds.txt` and `packages/api/internal/backfill/doc.go` 153 + - [x] Implement graph discovery: 153 154 1. Start from hop-0 seed users 154 155 2. Fetch `sh.tangled.graph.follow` records and collect subject DIDs 155 156 3. Fetch repo collaborators by inspecting repos, issues, PRs, and comments 156 157 4. Enqueue newly discovered DIDs with hop metadata 157 158 5. Stop expanding beyond `max-hops` 158 - - [ ] Track discovery metadata for logs: 159 + - [x] Track discovery metadata for logs: 159 160 - source DID 160 161 - hop depth 161 162 - discovery reason (`seed`, `follow`, `collaborator`) 162 - - [ ] Integrate with Tap admin endpoints: 163 + - [x] Integrate with Tap admin endpoints: 163 164 - `GET /info/:did` to skip already-tracked repos when practical 164 165 - `POST /repos/add` to register new DIDs for backfill 165 - - [ ] Make the command safe to re-run: 166 + - [x] Make the command safe to re-run: 166 167 - in-memory visited DID set during crawl 167 168 - tolerate duplicate `/repos/add` 168 169 - rely on index upsert idempotency for re-delivered records 169 - - [ ] Add operator-friendly logging: 170 + - [x] Add operator-friendly logging: 170 171 - seed count 171 172 - users discovered per hop 172 173 - already-tracked vs newly-submitted DIDs 173 174 - batch progress 174 175 - final totals 175 - - [ ] Add a short runbook covering: 176 + - [x] Add a short runbook covering: 176 177 - first bootstrap against an empty database 177 178 - repeat run after expanding the seed list 178 179 - dry-run before production mutation 180 + - Implemented: `packages/api/internal/backfill/doc.go` 179 181 180 182 ### Verification 181 183
+2 -1
packages/api/go.mod
··· 3 3 go 1.25.0 4 4 5 5 require ( 6 + github.com/coder/websocket v1.8.12 7 + github.com/joho/godotenv v1.5.1 6 8 github.com/spf13/cobra v1.10.2 7 9 github.com/tursodatabase/libsql-client-go v0.0.0-20251219100830-236aa1ff8acc 8 10 modernc.org/sqlite v1.47.0 ··· 10 12 11 13 require ( 12 14 github.com/antlr4-go/antlr/v4 v4.13.0 // indirect 13 - github.com/coder/websocket v1.8.12 // indirect 14 15 github.com/dustin/go-humanize v1.0.1 // indirect 15 16 github.com/google/uuid v1.6.0 // indirect 16 17 github.com/inconshreveable/mousetrap v1.1.0 // indirect
+2
packages/api/go.sum
··· 13 13 github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= 14 14 github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= 15 15 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= 16 + github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= 17 + github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= 16 18 github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= 17 19 github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= 18 20 github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
+267
packages/api/internal/backfill/backfill.go
··· 1 + package backfill 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "sort" 8 + "sync" 9 + "time" 10 + ) 11 + 12 + type discoveryStore interface { 13 + GetRepoCollaborators(ctx context.Context, repoOwnerDID string) ([]string, error) 14 + } 15 + 16 + // Runner executes seed resolution, graph discovery, and Tap registration. 17 + type Runner struct { 18 + store discoveryStore 19 + tap tapAdmin 20 + resolver handleResolver 21 + follows followFetcher 22 + log *slog.Logger 23 + } 24 + 25 + func NewRunner(store discoveryStore, tap tapAdmin, resolver handleResolver, log *slog.Logger) *Runner { 26 + return NewRunnerWithDeps(store, tap, resolver, NewHTTPFollowFetcher(), log) 27 + } 28 + 29 + func NewRunnerWithDeps(store discoveryStore, tap tapAdmin, resolver handleResolver, follows followFetcher, log *slog.Logger) *Runner { 30 + if log == nil { 31 + log = slog.Default() 32 + } 33 + if follows == nil { 34 + follows = NewHTTPFollowFetcher() 35 + } 36 + return &Runner{store: store, tap: tap, resolver: resolver, follows: follows, log: log} 37 + } 38 + 39 + func (r *Runner) Run(ctx context.Context, opts Options) error { 40 + if opts.SeedsPath == "" { 41 + return fmt.Errorf("--seeds is required") 42 + } 43 + if opts.MaxHops < 0 { 44 + return fmt.Errorf("--max-hops must be >= 0") 45 + } 46 + if opts.Concurrency <= 0 { 47 + opts.Concurrency = 5 48 + } 49 + if opts.BatchSize <= 0 { 50 + opts.BatchSize = 10 51 + } 52 + if opts.BatchDelay < 0 { 53 + return fmt.Errorf("--batch-delay must be >= 0") 54 + } 55 + 56 + seedEntries, err := parseSeedInput(opts.SeedsPath) 57 + if err != nil { 58 + return err 59 + } 60 + seeds, err := r.resolveSeeds(ctx, seedEntries) 61 + if err != nil { 62 + return err 63 + } 64 + if len(seeds) == 0 { 65 + return fmt.Errorf("no valid seed DIDs resolved") 66 + } 67 + 68 + r.log.Info("starting backfill discovery", 69 + slog.Int("seed_count", len(seeds)), 70 + slog.Int("max_hops", opts.MaxHops), 71 + slog.Int("concurrency", opts.Concurrency), 72 + ) 73 + 74 + discovered, err := r.discover(ctx, seeds, opts.MaxHops, opts.Concurrency) 75 + if err != nil { 76 + return err 77 + } 78 + 79 + r.log.Info("discovery complete", slog.Int("discovered_total", len(discovered))) 80 + if opts.DryRun { 81 + r.log.Info("dry-run mode enabled; skipping Tap mutations") 82 + return nil 83 + } 84 + 85 + alreadyTracked := 0 86 + toSubmit := make([]string, 0, len(discovered)) 87 + for _, user := range discovered { 88 + tracked, err := r.tap.IsTracked(ctx, user.DID) 89 + if err != nil { 90 + return fmt.Errorf("tap info for %s: %w", user.DID, err) 91 + } 92 + if tracked { 93 + alreadyTracked++ 94 + continue 95 + } 96 + toSubmit = append(toSubmit, user.DID) 97 + } 98 + 99 + r.log.Info("tap classification complete", 100 + slog.Int("already_tracked", alreadyTracked), 101 + slog.Int("to_submit", len(toSubmit)), 102 + ) 103 + 104 + submitted := 0 105 + for i := 0; i < len(toSubmit); i += opts.BatchSize { 106 + end := i + opts.BatchSize 107 + if end > len(toSubmit) { 108 + end = len(toSubmit) 109 + } 110 + batch := toSubmit[i:end] 111 + if err := r.tap.AddRepos(ctx, batch); err != nil { 112 + return fmt.Errorf("submit batch %d-%d: %w", i, end, err) 113 + } 114 + submitted += len(batch) 115 + r.log.Info("submitted Tap batch", 116 + slog.Int("batch_start", i), 117 + slog.Int("batch_end", end), 118 + slog.Int("batch_size", len(batch)), 119 + slog.Int("submitted_total", submitted), 120 + ) 121 + if end < len(toSubmit) && opts.BatchDelay > 0 { 122 + select { 123 + case <-ctx.Done(): 124 + return ctx.Err() 125 + case <-time.After(opts.BatchDelay): 126 + } 127 + } 128 + } 129 + 130 + r.log.Info("backfill complete", 131 + slog.Int("discovered_total", len(discovered)), 132 + slog.Int("already_tracked", alreadyTracked), 133 + slog.Int("submitted", submitted), 134 + ) 135 + return nil 136 + } 137 + 138 + func (r *Runner) resolveSeeds(ctx context.Context, entries []seedEntry) ([]string, error) { 139 + seen := map[string]bool{} 140 + seeds := make([]string, 0, len(entries)) 141 + for _, entry := range entries { 142 + if entry.isDID { 143 + seen[entry.raw] = true 144 + seeds = append(seeds, entry.raw) 145 + continue 146 + } 147 + did, err := r.resolver.Resolve(ctx, entry.raw) 148 + if err != nil { 149 + return nil, fmt.Errorf("resolve handle at line %d (%s): %w", entry.lineNo, entry.raw, err) 150 + } 151 + if seen[did] { 152 + continue 153 + } 154 + seen[did] = true 155 + seeds = append(seeds, did) 156 + } 157 + return seeds, nil 158 + } 159 + 160 + func (r *Runner) discover(ctx context.Context, seeds []string, maxHops int, concurrency int) ([]DiscoveredUser, error) { 161 + visited := map[string]DiscoveredUser{} 162 + ordered := make([]DiscoveredUser, 0) 163 + frontier := make([]DiscoveredUser, 0, len(seeds)) 164 + for _, did := range seeds { 165 + user := DiscoveredUser{DID: did, Hop: 0, Source: did, Reason: "seed"} 166 + visited[did] = user 167 + ordered = append(ordered, user) 168 + frontier = append(frontier, user) 169 + } 170 + 171 + for hop := 0; hop <= maxHops && len(frontier) > 0; hop++ { 172 + r.log.Info("processing discovery hop", slog.Int("hop", hop), slog.Int("users", len(frontier))) 173 + if hop == maxHops { 174 + break 175 + } 176 + 177 + type expansion struct { 178 + node DiscoveredUser 179 + follows []string 180 + collaborators []string 181 + err error 182 + } 183 + 184 + jobs := make(chan DiscoveredUser) 185 + results := make(chan expansion, len(frontier)) 186 + var wg sync.WaitGroup 187 + for i := 0; i < concurrency; i++ { 188 + wg.Add(1) 189 + go func() { 190 + defer wg.Done() 191 + for node := range jobs { 192 + follows, err := r.follows.ListFollowSubjects(ctx, node.DID) 193 + if err != nil { 194 + results <- expansion{node: node, err: fmt.Errorf("follows: %w", err)} 195 + continue 196 + } 197 + collaborators, err := r.store.GetRepoCollaborators(ctx, node.DID) 198 + if err != nil { 199 + results <- expansion{node: node, err: fmt.Errorf("collaborators: %w", err)} 200 + continue 201 + } 202 + results <- expansion{node: node, follows: follows, collaborators: collaborators} 203 + } 204 + }() 205 + } 206 + 207 + go func() { 208 + for _, node := range frontier { 209 + jobs <- node 210 + } 211 + close(jobs) 212 + wg.Wait() 213 + close(results) 214 + }() 215 + 216 + nextByDID := map[string]DiscoveredUser{} 217 + for res := range results { 218 + if res.err != nil { 219 + r.log.Warn("discovery expansion failed", slog.String("did", res.node.DID), slog.String("error", res.err.Error())) 220 + continue 221 + } 222 + 223 + r.log.Info("discovery expansion", 224 + slog.String("did", res.node.DID), 225 + slog.Int("hop", hop), 226 + slog.Int("follows", len(res.follows)), 227 + slog.Int("collaborators", len(res.collaborators)), 228 + ) 229 + 230 + for _, did := range res.follows { 231 + if !isDID(did) { 232 + continue 233 + } 234 + if _, exists := visited[did]; exists { 235 + continue 236 + } 237 + if _, exists := nextByDID[did]; exists { 238 + continue 239 + } 240 + nextByDID[did] = DiscoveredUser{DID: did, Hop: hop + 1, Source: res.node.DID, Reason: "follow"} 241 + } 242 + for _, did := range res.collaborators { 243 + if !isDID(did) { 244 + continue 245 + } 246 + if _, exists := visited[did]; exists { 247 + continue 248 + } 249 + if _, exists := nextByDID[did]; exists { 250 + continue 251 + } 252 + nextByDID[did] = DiscoveredUser{DID: did, Hop: hop + 1, Source: res.node.DID, Reason: "collaborator"} 253 + } 254 + } 255 + 256 + next := make([]DiscoveredUser, 0, len(nextByDID)) 257 + for _, user := range nextByDID { 258 + visited[user.DID] = user 259 + next = append(next, user) 260 + ordered = append(ordered, user) 261 + } 262 + sort.Slice(next, func(i, j int) bool { return next[i].DID < next[j].DID }) 263 + frontier = next 264 + } 265 + 266 + return ordered, nil 267 + }
+118
packages/api/internal/backfill/backfill_test.go
··· 1 + package backfill 2 + 3 + import ( 4 + "context" 5 + "io" 6 + "log/slog" 7 + "os" 8 + "path/filepath" 9 + "testing" 10 + ) 11 + 12 + type fakeStore struct { 13 + collaborators map[string][]string 14 + } 15 + 16 + func (f *fakeStore) GetRepoCollaborators(_ context.Context, did string) ([]string, error) { 17 + return f.collaborators[did], nil 18 + } 19 + 20 + type fakeFollowFetcher struct { 21 + follows map[string][]string 22 + } 23 + 24 + func (f *fakeFollowFetcher) ListFollowSubjects(_ context.Context, did string) ([]string, error) { 25 + return f.follows[did], nil 26 + } 27 + 28 + type fakeTapAdmin struct { 29 + tracked map[string]bool 30 + added [][]string 31 + } 32 + 33 + func (f *fakeTapAdmin) IsTracked(_ context.Context, did string) (bool, error) { 34 + return f.tracked[did], nil 35 + } 36 + 37 + func (f *fakeTapAdmin) AddRepos(_ context.Context, dids []string) error { 38 + batch := make([]string, len(dids)) 39 + copy(batch, dids) 40 + f.added = append(f.added, batch) 41 + return nil 42 + } 43 + 44 + type fakeResolver struct { 45 + mapping map[string]string 46 + } 47 + 48 + func (r *fakeResolver) Resolve(_ context.Context, handle string) (string, error) { 49 + if did, ok := r.mapping[handle]; ok { 50 + return did, nil 51 + } 52 + return "", io.EOF 53 + } 54 + 55 + func TestRunner_DiscoveryAndSubmit(t *testing.T) { 56 + st := &fakeStore{ 57 + collaborators: map[string][]string{ 58 + "did:plc:seed": {"did:plc:c1"}, 59 + }, 60 + } 61 + follows := &fakeFollowFetcher{follows: map[string][]string{"did:plc:seed": {"did:plc:f1"}}} 62 + tap := &fakeTapAdmin{tracked: map[string]bool{"did:plc:f1": true}} 63 + resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 64 + log := slog.New(slog.NewTextHandler(io.Discard, nil)) 65 + r := NewRunnerWithDeps(st, tap, resolver, follows, log) 66 + 67 + dir := t.TempDir() 68 + seedsPath := filepath.Join(dir, "seeds.txt") 69 + if err := os.WriteFile(seedsPath, []byte("alice.tangled.sh\n"), 0o644); err != nil { 70 + t.Fatalf("write seeds: %v", err) 71 + } 72 + 73 + err := r.Run(context.Background(), Options{ 74 + SeedsPath: seedsPath, 75 + MaxHops: 1, 76 + Concurrency: 2, 77 + BatchSize: 2, 78 + }) 79 + if err != nil { 80 + t.Fatalf("run backfill: %v", err) 81 + } 82 + 83 + if len(tap.added) != 1 { 84 + t.Fatalf("expected one batch, got %d", len(tap.added)) 85 + } 86 + if len(tap.added[0]) != 2 { 87 + t.Fatalf("expected 2 dids submitted, got %#v", tap.added[0]) 88 + } 89 + } 90 + 91 + func TestRunner_DryRunSkipsMutations(t *testing.T) { 92 + st := &fakeStore{collaborators: map[string][]string{}} 93 + follows := &fakeFollowFetcher{follows: map[string][]string{}} 94 + tap := &fakeTapAdmin{tracked: map[string]bool{}} 95 + resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 96 + log := slog.New(slog.NewTextHandler(io.Discard, nil)) 97 + r := NewRunnerWithDeps(st, tap, resolver, follows, log) 98 + 99 + dir := t.TempDir() 100 + seedsPath := filepath.Join(dir, "seeds.txt") 101 + if err := os.WriteFile(seedsPath, []byte("alice.tangled.sh\n"), 0o644); err != nil { 102 + t.Fatalf("write seeds: %v", err) 103 + } 104 + 105 + err := r.Run(context.Background(), Options{ 106 + SeedsPath: seedsPath, 107 + MaxHops: 0, 108 + DryRun: true, 109 + Concurrency: 1, 110 + BatchSize: 10, 111 + }) 112 + if err != nil { 113 + t.Fatalf("run dry-run backfill: %v", err) 114 + } 115 + if len(tap.added) != 0 { 116 + t.Fatalf("expected no tap submissions in dry-run, got %#v", tap.added) 117 + } 118 + }
+69
packages/api/internal/backfill/doc.go
··· 1 + // Package backfill provides graph bootstrap tooling for Twister. 2 + // 3 + // # Backfill Runbook 4 + // 5 + // This runbook covers initial graph bootstrap and repeat runs using: 6 + // 7 + // twister backfill 8 + // 9 + // # Seeds Input 10 + // 11 + // The `--seeds` flag supports either of these forms: 12 + // 13 + // 1. File path: 14 + // 15 + // twister backfill --seeds /etc/twister/seeds.txt 16 + // 17 + // 2. Comma-separated inline list: 18 + // 19 + // twister backfill --seeds anirudh.fi,atprotocol.dev,oppi.li 20 + // 21 + // Supported seed entries are DIDs and handles. 22 + // 23 + // Repository-managed example seed file: 24 + // 25 + // docs/api/seeds.txt 26 + // 27 + // Runtime seed file is typically mounted outside the repo, for example: 28 + // 29 + // /etc/twister/seeds.txt 30 + // 31 + // # Prerequisites 32 + // 33 + // Required environment variables: 34 + // 35 + // - TURSO_DATABASE_URL 36 + // - TURSO_AUTH_TOKEN (for non-file Turso URLs) 37 + // - TAP_URL 38 + // - TAP_AUTH_PASSWORD 39 + // 40 + // # First Bootstrap 41 + // 42 + // 1. Copy and customize seeds: 43 + // 44 + // cp docs/api/seeds.txt /tmp/twister-seeds.txt 45 + // 46 + // 2. Run dry-run first: 47 + // 48 + // twister backfill --seeds /tmp/twister-seeds.txt --max-hops 2 --dry-run 49 + // 50 + // 3. Run real backfill: 51 + // 52 + // twister backfill --seeds /tmp/twister-seeds.txt --max-hops 2 --concurrency 5 --batch-size 10 --batch-delay 1s 53 + // 54 + // Watch logs for seed count, hop-level discoveries, already-tracked vs submitted 55 + // users, and batch progress totals. 56 + // 57 + // # Repeat Run 58 + // 59 + // Append new candidate users to the seed source, run dry-run, then run the real 60 + // command again. Reruns are safe because discovery deduplicates in-memory and 61 + // Tap /repos/add is treated as idempotent. 62 + // 63 + // # Dry-Run Safety 64 + // 65 + // Before production mutation: 66 + // - include --dry-run 67 + // - confirm only discovery output appears 68 + // - confirm no Tap mutation side effects 69 + package backfill
+145
packages/api/internal/backfill/follows.go
··· 1 + package backfill 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "net/http" 8 + "net/url" 9 + "strings" 10 + "time" 11 + ) 12 + 13 + const ( 14 + plcDirectoryBase = "https://plc.directory" 15 + followCollection = "sh.tangled.graph.follow" 16 + ) 17 + 18 + type followFetcher interface { 19 + ListFollowSubjects(ctx context.Context, did string) ([]string, error) 20 + } 21 + 22 + // HTTPFollowFetcher resolves a DID's PDS endpoint and reads follow records 23 + // directly from com.atproto.repo.listRecords. 24 + type HTTPFollowFetcher struct { 25 + client *http.Client 26 + } 27 + 28 + func NewHTTPFollowFetcher() *HTTPFollowFetcher { 29 + return &HTTPFollowFetcher{ 30 + client: &http.Client{Timeout: 15 * time.Second}, 31 + } 32 + } 33 + 34 + func (f *HTTPFollowFetcher) ListFollowSubjects(ctx context.Context, did string) ([]string, error) { 35 + pdsEndpoint, err := f.resolvePDSEndpoint(ctx, did) 36 + if err != nil { 37 + return nil, err 38 + } 39 + 40 + seen := map[string]bool{} 41 + var subjects []string 42 + cursor := "" 43 + 44 + for { 45 + u, err := url.Parse(strings.TrimSuffix(pdsEndpoint, "/") + "/xrpc/com.atproto.repo.listRecords") 46 + if err != nil { 47 + return nil, fmt.Errorf("build listRecords url: %w", err) 48 + } 49 + q := u.Query() 50 + q.Set("repo", did) 51 + q.Set("collection", followCollection) 52 + q.Set("limit", "100") 53 + if cursor != "" { 54 + q.Set("cursor", cursor) 55 + } 56 + u.RawQuery = q.Encode() 57 + 58 + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) 59 + if err != nil { 60 + return nil, fmt.Errorf("build listRecords request: %w", err) 61 + } 62 + 63 + resp, err := f.client.Do(req) 64 + if err != nil { 65 + return nil, fmt.Errorf("listRecords request: %w", err) 66 + } 67 + 68 + var payload struct { 69 + Cursor string `json:"cursor"` 70 + Records []struct { 71 + Value map[string]any `json:"value"` 72 + } `json:"records"` 73 + } 74 + if resp.StatusCode != http.StatusOK { 75 + _ = resp.Body.Close() 76 + return nil, fmt.Errorf("listRecords failed: status %d", resp.StatusCode) 77 + } 78 + if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil { 79 + _ = resp.Body.Close() 80 + return nil, fmt.Errorf("decode listRecords response: %w", err) 81 + } 82 + _ = resp.Body.Close() 83 + 84 + for _, rec := range payload.Records { 85 + subject, _ := rec.Value["subject"].(string) 86 + if !isDID(subject) || seen[subject] { 87 + continue 88 + } 89 + seen[subject] = true 90 + subjects = append(subjects, subject) 91 + } 92 + 93 + if payload.Cursor == "" || payload.Cursor == cursor { 94 + break 95 + } 96 + cursor = payload.Cursor 97 + } 98 + 99 + return subjects, nil 100 + } 101 + 102 + func (f *HTTPFollowFetcher) resolvePDSEndpoint(ctx context.Context, did string) (string, error) { 103 + var didDocURL string 104 + switch { 105 + case strings.HasPrefix(did, "did:plc:"): 106 + didDocURL = plcDirectoryBase + "/" + url.PathEscape(did) 107 + case strings.HasPrefix(did, "did:web:"): 108 + hostAndPath := strings.TrimPrefix(did, "did:web:") 109 + hostAndPath = strings.ReplaceAll(hostAndPath, ":", "/") 110 + didDocURL = "https://" + hostAndPath + "/.well-known/did.json" 111 + default: 112 + return "", fmt.Errorf("unsupported did type for pds resolution: %s", did) 113 + } 114 + 115 + req, err := http.NewRequestWithContext(ctx, http.MethodGet, didDocURL, nil) 116 + if err != nil { 117 + return "", fmt.Errorf("build did doc request: %w", err) 118 + } 119 + resp, err := f.client.Do(req) 120 + if err != nil { 121 + return "", fmt.Errorf("did doc request: %w", err) 122 + } 123 + defer resp.Body.Close() 124 + if resp.StatusCode != http.StatusOK { 125 + return "", fmt.Errorf("did doc lookup failed: status %d", resp.StatusCode) 126 + } 127 + 128 + var didDoc struct { 129 + Service []struct { 130 + Type string `json:"type"` 131 + ServiceEndpoint string `json:"serviceEndpoint"` 132 + } `json:"service"` 133 + } 134 + if err := json.NewDecoder(resp.Body).Decode(&didDoc); err != nil { 135 + return "", fmt.Errorf("decode did doc: %w", err) 136 + } 137 + 138 + for _, service := range didDoc.Service { 139 + if service.Type == "AtprotoPersonalDataServer" && strings.TrimSpace(service.ServiceEndpoint) != "" { 140 + return strings.TrimSpace(service.ServiceEndpoint), nil 141 + } 142 + } 143 + 144 + return "", fmt.Errorf("no atproto pds endpoint in did document") 145 + }
+63
packages/api/internal/backfill/resolve.go
··· 1 + package backfill 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "net/http" 8 + "net/url" 9 + "time" 10 + ) 11 + 12 + const defaultIdentityService = "https://public.api.bsky.app" 13 + 14 + type handleResolver interface { 15 + Resolve(ctx context.Context, handle string) (string, error) 16 + } 17 + 18 + // HTTPHandleResolver resolves handles through com.atproto.identity.resolveHandle. 19 + type HTTPHandleResolver struct { 20 + baseURL string 21 + client *http.Client 22 + } 23 + 24 + func NewHTTPHandleResolver(baseURL string) *HTTPHandleResolver { 25 + if baseURL == "" { 26 + baseURL = defaultIdentityService 27 + } 28 + return &HTTPHandleResolver{ 29 + baseURL: baseURL, 30 + client: &http.Client{ 31 + Timeout: 10 * time.Second, 32 + }, 33 + } 34 + } 35 + 36 + func (r *HTTPHandleResolver) Resolve(ctx context.Context, handle string) (string, error) { 37 + u := r.baseURL + "/xrpc/com.atproto.identity.resolveHandle?handle=" + url.QueryEscape(handle) 38 + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) 39 + if err != nil { 40 + return "", fmt.Errorf("build resolve handle request: %w", err) 41 + } 42 + 43 + resp, err := r.client.Do(req) 44 + if err != nil { 45 + return "", fmt.Errorf("resolve handle request: %w", err) 46 + } 47 + defer resp.Body.Close() 48 + 49 + if resp.StatusCode != http.StatusOK { 50 + return "", fmt.Errorf("resolve handle failed: status %d", resp.StatusCode) 51 + } 52 + 53 + var payload struct { 54 + DID string `json:"did"` 55 + } 56 + if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil { 57 + return "", fmt.Errorf("decode resolve handle response: %w", err) 58 + } 59 + if !isDID(payload.DID) { 60 + return "", fmt.Errorf("resolve handle returned invalid did %q", payload.DID) 61 + } 62 + return payload.DID, nil 63 + }
+98
packages/api/internal/backfill/seed.go
··· 1 + package backfill 2 + 3 + import ( 4 + "bufio" 5 + "fmt" 6 + "os" 7 + "strings" 8 + ) 9 + 10 + type seedEntry struct { 11 + raw string 12 + isDID bool 13 + lineNo int 14 + } 15 + 16 + func parseSeedFile(path string) ([]seedEntry, error) { 17 + f, err := os.Open(path) 18 + if err != nil { 19 + return nil, fmt.Errorf("open seed file: %w", err) 20 + } 21 + defer f.Close() 22 + 23 + seen := map[string]bool{} 24 + entries := make([]seedEntry, 0) 25 + s := bufio.NewScanner(f) 26 + lineNo := 0 27 + for s.Scan() { 28 + lineNo++ 29 + line := strings.TrimSpace(s.Text()) 30 + if line == "" || strings.HasPrefix(line, "#") { 31 + continue 32 + } 33 + if seen[line] { 34 + continue 35 + } 36 + seen[line] = true 37 + entries = append(entries, seedEntry{ 38 + raw: line, 39 + isDID: isDID(line), 40 + lineNo: lineNo, 41 + }) 42 + } 43 + if err := s.Err(); err != nil { 44 + return nil, fmt.Errorf("scan seed file: %w", err) 45 + } 46 + if len(entries) == 0 { 47 + return nil, fmt.Errorf("seed file has no valid entries") 48 + } 49 + return entries, nil 50 + } 51 + 52 + func parseSeedInput(input string) ([]seedEntry, error) { 53 + input = strings.TrimSpace(input) 54 + if input == "" { 55 + return nil, fmt.Errorf("seeds input is required") 56 + } 57 + 58 + if strings.Contains(input, ",") { 59 + return parseSeedList(input) 60 + } 61 + 62 + info, err := os.Stat(input) 63 + if err == nil && !info.IsDir() { 64 + return parseSeedFile(input) 65 + } 66 + 67 + // Single inline DID/handle is supported for convenience. 68 + return parseSeedList(input) 69 + } 70 + 71 + func parseSeedList(list string) ([]seedEntry, error) { 72 + parts := strings.Split(list, ",") 73 + seen := map[string]bool{} 74 + entries := make([]seedEntry, 0, len(parts)) 75 + for i, part := range parts { 76 + value := strings.TrimSpace(part) 77 + if value == "" { 78 + continue 79 + } 80 + if seen[value] { 81 + continue 82 + } 83 + seen[value] = true 84 + entries = append(entries, seedEntry{ 85 + raw: value, 86 + isDID: isDID(value), 87 + lineNo: i + 1, 88 + }) 89 + } 90 + if len(entries) == 0 { 91 + return nil, fmt.Errorf("seed list has no valid entries") 92 + } 93 + return entries, nil 94 + } 95 + 96 + func isDID(v string) bool { 97 + return strings.HasPrefix(v, "did:") 98 + }
+73
packages/api/internal/backfill/seed_test.go
··· 1 + package backfill 2 + 3 + import ( 4 + "os" 5 + "path/filepath" 6 + "testing" 7 + ) 8 + 9 + func TestParseSeedFile(t *testing.T) { 10 + dir := t.TempDir() 11 + path := filepath.Join(dir, "seeds.txt") 12 + content := "\n# comment\ndid:plc:one\nalice.tangled.sh\ndid:plc:one\n\n" 13 + if err := os.WriteFile(path, []byte(content), 0o644); err != nil { 14 + t.Fatalf("write seed file: %v", err) 15 + } 16 + 17 + entries, err := parseSeedFile(path) 18 + if err != nil { 19 + t.Fatalf("parse seed file: %v", err) 20 + } 21 + if len(entries) != 2 { 22 + t.Fatalf("entries length: got %d want 2", len(entries)) 23 + } 24 + if !entries[0].isDID { 25 + t.Fatalf("first entry should be did") 26 + } 27 + if entries[1].isDID { 28 + t.Fatalf("second entry should be handle") 29 + } 30 + } 31 + 32 + func TestParseSeedInput_FilePath(t *testing.T) { 33 + dir := t.TempDir() 34 + path := filepath.Join(dir, "seeds.txt") 35 + content := "did:plc:one\nhandle.example\n" 36 + if err := os.WriteFile(path, []byte(content), 0o644); err != nil { 37 + t.Fatalf("write seed file: %v", err) 38 + } 39 + 40 + entries, err := parseSeedInput(path) 41 + if err != nil { 42 + t.Fatalf("parse seed input from file: %v", err) 43 + } 44 + if len(entries) != 2 { 45 + t.Fatalf("entries length: got %d want 2", len(entries)) 46 + } 47 + } 48 + 49 + func TestParseSeedInput_CommaSeparated(t *testing.T) { 50 + entries, err := parseSeedInput("anirudh.fi, atprotocol.dev, did:plc:abc") 51 + if err != nil { 52 + t.Fatalf("parse comma-separated seeds: %v", err) 53 + } 54 + if len(entries) != 3 { 55 + t.Fatalf("entries length: got %d want 3", len(entries)) 56 + } 57 + if !entries[2].isDID { 58 + t.Fatalf("expected third entry to be did") 59 + } 60 + } 61 + 62 + func TestParseSeedInput_SingleInline(t *testing.T) { 63 + entries, err := parseSeedInput("tangled.org") 64 + if err != nil { 65 + t.Fatalf("parse single inline seed: %v", err) 66 + } 67 + if len(entries) != 1 { 68 + t.Fatalf("entries length: got %d want 1", len(entries)) 69 + } 70 + if entries[0].raw != "tangled.org" { 71 + t.Fatalf("entry: got %q", entries[0].raw) 72 + } 73 + }
+128
packages/api/internal/backfill/tap_admin.go
··· 1 + package backfill 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/base64" 7 + "encoding/json" 8 + "fmt" 9 + "net/http" 10 + "net/url" 11 + "strings" 12 + "time" 13 + ) 14 + 15 + type tapAdmin interface { 16 + IsTracked(ctx context.Context, did string) (bool, error) 17 + AddRepos(ctx context.Context, dids []string) error 18 + } 19 + 20 + // HTTPTapAdmin calls Tap admin endpoints for backfill orchestration. 21 + type HTTPTapAdmin struct { 22 + baseURL string 23 + password string 24 + client *http.Client 25 + } 26 + 27 + func NewHTTPTapAdmin(tapURL, password string) (*HTTPTapAdmin, error) { 28 + baseURL, err := normalizeTapBaseURL(tapURL) 29 + if err != nil { 30 + return nil, err 31 + } 32 + return &HTTPTapAdmin{ 33 + baseURL: baseURL, 34 + password: password, 35 + client: &http.Client{ 36 + Timeout: 15 * time.Second, 37 + }, 38 + }, nil 39 + } 40 + 41 + func (t *HTTPTapAdmin) IsTracked(ctx context.Context, did string) (bool, error) { 42 + endpoint := fmt.Sprintf("%s/info/%s", t.baseURL, url.PathEscape(did)) 43 + req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) 44 + if err != nil { 45 + return false, fmt.Errorf("build tap info request: %w", err) 46 + } 47 + t.addAuth(req) 48 + 49 + resp, err := t.client.Do(req) 50 + if err != nil { 51 + return false, fmt.Errorf("tap info request: %w", err) 52 + } 53 + defer resp.Body.Close() 54 + 55 + if resp.StatusCode == http.StatusNotFound { 56 + return false, nil 57 + } 58 + if resp.StatusCode < 200 || resp.StatusCode >= 300 { 59 + return false, fmt.Errorf("tap info request failed: status %d", resp.StatusCode) 60 + } 61 + return true, nil 62 + } 63 + 64 + func (t *HTTPTapAdmin) AddRepos(ctx context.Context, dids []string) error { 65 + if len(dids) == 0 { 66 + return nil 67 + } 68 + 69 + payload, err := json.Marshal(map[string][]string{"dids": dids}) 70 + if err != nil { 71 + return fmt.Errorf("marshal repos add payload: %w", err) 72 + } 73 + 74 + endpoint := t.baseURL + "/repos/add" 75 + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(payload)) 76 + if err != nil { 77 + return fmt.Errorf("build repos add request: %w", err) 78 + } 79 + req.Header.Set("Content-Type", "application/json") 80 + t.addAuth(req) 81 + 82 + resp, err := t.client.Do(req) 83 + if err != nil { 84 + return fmt.Errorf("repos add request: %w", err) 85 + } 86 + defer resp.Body.Close() 87 + if resp.StatusCode < 200 || resp.StatusCode >= 300 { 88 + return fmt.Errorf("repos add failed: status %d", resp.StatusCode) 89 + } 90 + return nil 91 + } 92 + 93 + func (t *HTTPTapAdmin) addAuth(req *http.Request) { 94 + if t.password == "" { 95 + return 96 + } 97 + token := base64.StdEncoding.EncodeToString([]byte("admin:" + t.password)) 98 + req.Header.Set("Authorization", "Basic "+token) 99 + } 100 + 101 + func normalizeTapBaseURL(raw string) (string, error) { 102 + raw = strings.TrimSpace(raw) 103 + if raw == "" { 104 + return "", fmt.Errorf("tap url is required") 105 + } 106 + u, err := url.Parse(raw) 107 + if err != nil { 108 + return "", fmt.Errorf("parse tap url: %w", err) 109 + } 110 + switch u.Scheme { 111 + case "ws": 112 + u.Scheme = "http" 113 + case "wss": 114 + u.Scheme = "https" 115 + case "http", "https": 116 + default: 117 + return "", fmt.Errorf("unsupported tap url scheme %q", u.Scheme) 118 + } 119 + 120 + u.RawQuery = "" 121 + u.Fragment = "" 122 + u.Path = strings.TrimSuffix(u.Path, "/") 123 + u.Path = strings.TrimSuffix(u.Path, "/channel") 124 + if u.Path == "/" { 125 + u.Path = "" 126 + } 127 + return strings.TrimSuffix(u.String(), "/"), nil 128 + }
+21
packages/api/internal/backfill/types.go
··· 1 + package backfill 2 + 3 + import "time" 4 + 5 + // Options configures a backfill run. 6 + type Options struct { 7 + SeedsPath string 8 + MaxHops int 9 + DryRun bool 10 + Concurrency int 11 + BatchSize int 12 + BatchDelay time.Duration 13 + } 14 + 15 + // DiscoveredUser contains crawl metadata for an included DID. 16 + type DiscoveredUser struct { 17 + DID string 18 + Hop int 19 + Source string 20 + Reason string 21 + }
+32
packages/api/internal/config/config.go
··· 3 3 import ( 4 4 "errors" 5 5 "os" 6 + "path/filepath" 6 7 "strconv" 7 8 "strings" 9 + 10 + "github.com/joho/godotenv" 8 11 ) 9 12 10 13 type Config struct { ··· 32 35 } 33 36 34 37 func Load() (*Config, error) { 38 + loadDotEnv() 39 + 35 40 cfg := &Config{ 36 41 TursoURL: os.Getenv("TURSO_DATABASE_URL"), 37 42 TursoToken: os.Getenv("TURSO_AUTH_TOKEN"), ··· 67 72 return nil, errors.Join(errs...) 68 73 } 69 74 return cfg, nil 75 + } 76 + 77 + func loadDotEnv() { 78 + seen := map[string]bool{} 79 + candidates := make([]string, 0, 8) 80 + 81 + if explicit := strings.TrimSpace(os.Getenv("TWISTER_ENV_FILE")); explicit != "" { 82 + candidates = append(candidates, explicit) 83 + } 84 + 85 + if cwd, err := os.Getwd(); err == nil { 86 + for _, rel := range []string{".env", "../.env", "../../.env"} { 87 + candidates = append(candidates, filepath.Join(cwd, rel)) 88 + } 89 + } 90 + 91 + for _, candidate := range candidates { 92 + if candidate == "" || seen[candidate] { 93 + continue 94 + } 95 + seen[candidate] = true 96 + if _, err := os.Stat(candidate); err != nil { 97 + continue 98 + } 99 + // Load does not override existing process env vars. 100 + _ = godotenv.Load(candidate) 101 + } 70 102 } 71 103 72 104 func envOrDefault(key, def string) string {
+45
packages/api/internal/ingest/ingest.go
··· 6 6 "log/slog" 7 7 "math" 8 8 "strings" 9 + "sync" 9 10 "time" 10 11 11 12 "tangled.org/desertthunder.dev/twister/internal/normalize" ··· 15 16 const ( 16 17 defaultConsumerName = "indexer-tap-v1" 17 18 maxDBRetryBackoff = 5 * time.Second 19 + statusLogInterval = 30 * time.Second 18 20 ) 19 21 20 22 type client interface { ··· 31 33 allowlist allowlist 32 34 consumerName string 33 35 log *slog.Logger 36 + 37 + statusMu sync.Mutex 38 + lastCursor string 39 + processedTick int64 34 40 } 35 41 36 42 func NewRunner(st store.Store, registry *normalize.Registry, tap client, indexedCollections string, log *slog.Logger) *Runner { ··· 49 55 50 56 func (r *Runner) Run(ctx context.Context) error { 51 57 defer r.tap.Close() 58 + 59 + go r.runStatusLogger(ctx) 52 60 53 61 for { 54 62 if ctx.Err() != nil { ··· 225 233 if err := r.tap.AckEvent(ctx, eventID); err != nil { 226 234 return err 227 235 } 236 + r.markProcessed(cursor) 228 237 return nil 238 + } 239 + 240 + func (r *Runner) runStatusLogger(ctx context.Context) { 241 + ticker := time.NewTicker(statusLogInterval) 242 + defer ticker.Stop() 243 + 244 + for { 245 + select { 246 + case <-ctx.Done(): 247 + return 248 + case <-ticker.C: 249 + r.statusMu.Lock() 250 + cursor := r.lastCursor 251 + processed := r.processedTick 252 + r.processedTick = 0 253 + r.statusMu.Unlock() 254 + 255 + docs, err := r.store.CountDocuments(ctx) 256 + if err != nil { 257 + r.log.Warn("indexer status failed", slog.String("error", err.Error())) 258 + continue 259 + } 260 + r.log.Info("indexer status", 261 + slog.String("cursor", cursor), 262 + slog.Int64("events_processed", processed), 263 + slog.Int64("documents", docs), 264 + ) 265 + } 266 + } 267 + } 268 + 269 + func (r *Runner) markProcessed(cursor string) { 270 + r.statusMu.Lock() 271 + r.lastCursor = cursor 272 + r.processedTick++ 273 + r.statusMu.Unlock() 229 274 } 230 275 231 276 type allowlist struct {
+12
packages/api/internal/ingest/ingest_test.go
··· 87 87 return nil 88 88 } 89 89 90 + func (f *fakeStore) GetFollowSubjects(_ context.Context, _ string) ([]string, error) { 91 + return nil, nil 92 + } 93 + 94 + func (f *fakeStore) GetRepoCollaborators(_ context.Context, _ string) ([]string, error) { 95 + return nil, nil 96 + } 97 + 98 + func (f *fakeStore) CountDocuments(_ context.Context) (int64, error) { 99 + return int64(len(f.docs)), nil 100 + } 101 + 90 102 func newRunnerForTest(st *fakeStore, tap *fakeTapClient, indexedCollections string) *Runner { 91 103 logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 92 104 return NewRunner(st, normalize.NewRegistry(), tap, indexedCollections, logger)
+33
packages/api/internal/normalize/follow.go
··· 1 + package normalize 2 + 3 + import "tangled.org/desertthunder.dev/twister/internal/store" 4 + 5 + const collectionFollow = "sh.tangled.graph.follow" 6 + 7 + // FollowAdapter normalizes follow edges for graph-backfill discovery. 8 + type FollowAdapter struct{} 9 + 10 + func (a *FollowAdapter) Collection() string { return collectionFollow } 11 + func (a *FollowAdapter) RecordType() string { return "follow" } 12 + 13 + func (a *FollowAdapter) Searchable(_ map[string]any) bool { return false } 14 + 15 + func (a *FollowAdapter) Normalize(event TapRecordEvent) (*store.Document, error) { 16 + r := event.Record 17 + rec := r.Record 18 + subject := str(rec, "subject") 19 + 20 + return &store.Document{ 21 + ID: StableID(r.DID, r.Collection, r.RKey), 22 + DID: r.DID, 23 + Collection: r.Collection, 24 + RKey: r.RKey, 25 + ATURI: BuildATURI(r.DID, r.Collection, r.RKey), 26 + CID: r.CID, 27 + RecordType: a.RecordType(), 28 + Title: subject, 29 + RepoDID: subject, 30 + TagsJSON: "[]", 31 + CreatedAt: str(rec, "createdAt"), 32 + }, nil 33 + }
+50
packages/api/internal/normalize/issue_comment.go
··· 1 + package normalize 2 + 3 + import ( 4 + "fmt" 5 + 6 + "tangled.org/desertthunder.dev/twister/internal/store" 7 + ) 8 + 9 + const collectionIssueComment = "sh.tangled.repo.issue.comment" 10 + 11 + // IssueCommentAdapter normalizes issue comments for collaborator discovery. 12 + type IssueCommentAdapter struct{} 13 + 14 + func (a *IssueCommentAdapter) Collection() string { return collectionIssueComment } 15 + func (a *IssueCommentAdapter) RecordType() string { return "issue_comment" } 16 + 17 + func (a *IssueCommentAdapter) Searchable(record map[string]any) bool { 18 + return str(record, "body") != "" 19 + } 20 + 21 + func (a *IssueCommentAdapter) Normalize(event TapRecordEvent) (*store.Document, error) { 22 + r := event.Record 23 + rec := r.Record 24 + body := str(rec, "body") 25 + issueURI := str(rec, "issue") 26 + 27 + repoDID := "" 28 + if issueURI != "" { 29 + did, _, _, err := ParseATURI(issueURI) 30 + if err != nil { 31 + return nil, fmt.Errorf("issue comment issue AT-URI: %w", err) 32 + } 33 + repoDID = did 34 + } 35 + 36 + return &store.Document{ 37 + ID: StableID(r.DID, r.Collection, r.RKey), 38 + DID: r.DID, 39 + Collection: r.Collection, 40 + RKey: r.RKey, 41 + ATURI: BuildATURI(r.DID, r.Collection, r.RKey), 42 + CID: r.CID, 43 + RecordType: a.RecordType(), 44 + Body: body, 45 + Summary: truncate(body, 200), 46 + RepoDID: repoDID, 47 + TagsJSON: "[]", 48 + CreatedAt: str(rec, "createdAt"), 49 + }, nil 50 + }
+63
packages/api/internal/normalize/normalize_test.go
··· 276 276 } 277 277 } 278 278 279 + func TestFollowAdapter(t *testing.T) { 280 + event := loadFixture(t, "follow.json") 281 + adapter := &normalize.FollowAdapter{} 282 + 283 + doc, err := adapter.Normalize(event) 284 + if err != nil { 285 + t.Fatalf("Normalize: %v", err) 286 + } 287 + 288 + if doc.RecordType != "follow" { 289 + t.Errorf("RecordType = %q", doc.RecordType) 290 + } 291 + if doc.RepoDID != "did:plc:bob" { 292 + t.Errorf("RepoDID = %q, want did:plc:bob", doc.RepoDID) 293 + } 294 + if adapter.Searchable(event.Record.Record) { 295 + t.Error("Searchable = true, want false") 296 + } 297 + } 298 + 299 + func TestIssueCommentAdapter(t *testing.T) { 300 + event := loadFixture(t, "issue_comment.json") 301 + adapter := &normalize.IssueCommentAdapter{} 302 + 303 + doc, err := adapter.Normalize(event) 304 + if err != nil { 305 + t.Fatalf("Normalize: %v", err) 306 + } 307 + 308 + if doc.RecordType != "issue_comment" { 309 + t.Errorf("RecordType = %q", doc.RecordType) 310 + } 311 + if doc.RepoDID != "did:plc:repoowner" { 312 + t.Errorf("RepoDID = %q, want did:plc:repoowner", doc.RepoDID) 313 + } 314 + if !adapter.Searchable(event.Record.Record) { 315 + t.Error("Searchable = false for non-empty comment body") 316 + } 317 + } 318 + 319 + func TestPullCommentAdapter(t *testing.T) { 320 + event := loadFixture(t, "pull_comment.json") 321 + adapter := &normalize.PullCommentAdapter{} 322 + 323 + doc, err := adapter.Normalize(event) 324 + if err != nil { 325 + t.Fatalf("Normalize: %v", err) 326 + } 327 + 328 + if doc.RecordType != "pull_comment" { 329 + t.Errorf("RecordType = %q", doc.RecordType) 330 + } 331 + if doc.RepoDID != "did:plc:repoowner" { 332 + t.Errorf("RepoDID = %q, want did:plc:repoowner", doc.RepoDID) 333 + } 334 + if !adapter.Searchable(event.Record.Record) { 335 + t.Error("Searchable = false for non-empty comment body") 336 + } 337 + } 338 + 279 339 // TestIssueStateHandler verifies record_state extraction. 280 340 func TestIssueStateHandler(t *testing.T) { 281 341 event := loadFixture(t, "issue_state.json") ··· 352 412 "sh.tangled.repo", 353 413 "sh.tangled.repo.issue", 354 414 "sh.tangled.repo.pull", 415 + "sh.tangled.repo.issue.comment", 416 + "sh.tangled.repo.pull.comment", 417 + "sh.tangled.graph.follow", 355 418 "sh.tangled.string", 356 419 "sh.tangled.actor.profile", 357 420 }
+50
packages/api/internal/normalize/pull_comment.go
··· 1 + package normalize 2 + 3 + import ( 4 + "fmt" 5 + 6 + "tangled.org/desertthunder.dev/twister/internal/store" 7 + ) 8 + 9 + const collectionPullComment = "sh.tangled.repo.pull.comment" 10 + 11 + // PullCommentAdapter normalizes pull comments for collaborator discovery. 12 + type PullCommentAdapter struct{} 13 + 14 + func (a *PullCommentAdapter) Collection() string { return collectionPullComment } 15 + func (a *PullCommentAdapter) RecordType() string { return "pull_comment" } 16 + 17 + func (a *PullCommentAdapter) Searchable(record map[string]any) bool { 18 + return str(record, "body") != "" 19 + } 20 + 21 + func (a *PullCommentAdapter) Normalize(event TapRecordEvent) (*store.Document, error) { 22 + r := event.Record 23 + rec := r.Record 24 + body := str(rec, "body") 25 + pullURI := str(rec, "pull") 26 + 27 + repoDID := "" 28 + if pullURI != "" { 29 + did, _, _, err := ParseATURI(pullURI) 30 + if err != nil { 31 + return nil, fmt.Errorf("pull comment pull AT-URI: %w", err) 32 + } 33 + repoDID = did 34 + } 35 + 36 + return &store.Document{ 37 + ID: StableID(r.DID, r.Collection, r.RKey), 38 + DID: r.DID, 39 + Collection: r.Collection, 40 + RKey: r.RKey, 41 + ATURI: BuildATURI(r.DID, r.Collection, r.RKey), 42 + CID: r.CID, 43 + RecordType: a.RecordType(), 44 + Body: body, 45 + Summary: truncate(body, 200), 46 + RepoDID: repoDID, 47 + TagsJSON: "[]", 48 + CreatedAt: str(rec, "createdAt"), 49 + }, nil 50 + }
+3
packages/api/internal/normalize/registry.go
··· 16 16 &RepoAdapter{}, 17 17 &IssueAdapter{}, 18 18 &PullAdapter{}, 19 + &IssueCommentAdapter{}, 20 + &PullCommentAdapter{}, 19 21 &StringAdapter{}, 22 + &FollowAdapter{}, 20 23 &ProfileAdapter{}, 21 24 } { 22 25 r.adapters[a.Collection()] = a
+14
packages/api/internal/normalize/testdata/follow.json
··· 1 + { 2 + "id": 2001, 3 + "type": "record", 4 + "record": { 5 + "live": true, 6 + "rev": "3kb3follow1", 7 + "did": "did:plc:alice", 8 + "collection": "sh.tangled.graph.follow", 9 + "rkey": "f1", 10 + "action": "create", 11 + "cid": "bafyreifollow", 12 + "record": { "$type": "sh.tangled.graph.follow", "subject": "did:plc:bob", "createdAt": "2026-03-20T12:00:00.000Z" } 13 + } 14 + }
+19
packages/api/internal/normalize/testdata/issue_comment.json
··· 1 + { 2 + "id": 2002, 3 + "type": "record", 4 + "record": { 5 + "live": true, 6 + "rev": "3kb3ic1", 7 + "did": "did:plc:commenter", 8 + "collection": "sh.tangled.repo.issue.comment", 9 + "rkey": "ic1", 10 + "action": "create", 11 + "cid": "bafyreic1", 12 + "record": { 13 + "$type": "sh.tangled.repo.issue.comment", 14 + "issue": "at://did:plc:repoowner/sh.tangled.repo.issue/issue1", 15 + "body": "I can help with this fix.", 16 + "createdAt": "2026-03-20T13:00:00.000Z" 17 + } 18 + } 19 + }
+19
packages/api/internal/normalize/testdata/pull_comment.json
··· 1 + { 2 + "id": 2003, 3 + "type": "record", 4 + "record": { 5 + "live": true, 6 + "rev": "3kb3pc1", 7 + "did": "did:plc:reviewer", 8 + "collection": "sh.tangled.repo.pull.comment", 9 + "rkey": "pc1", 10 + "action": "create", 11 + "cid": "bafyreipc1", 12 + "record": { 13 + "$type": "sh.tangled.repo.pull.comment", 14 + "pull": "at://did:plc:repoowner/sh.tangled.repo.pull/pull1", 15 + "body": "Looks good to me.", 16 + "createdAt": "2026-03-20T14:00:00.000Z" 17 + } 18 + } 19 + }
+9 -2
packages/api/internal/store/db.go
··· 15 15 //go:embed migrations/*.sql 16 16 var migrationsFS embed.FS 17 17 18 + var extensionMigrationNoticeLogged bool 19 + 18 20 // Open establishes a connection to the database. 19 21 // For remote Turso URLs (libsql:// or https://) it uses the libsql-client-go driver. 20 22 // For local file: URLs it uses the pure-Go SQLite driver (no CGo required). ··· 73 75 if _, err := db.Exec(stmt); err != nil { 74 76 upper := strings.ToUpper(stmt) 75 77 if strings.Contains(upper, "USING FTS") || strings.Contains(upper, "LIBSQL_VECTOR_IDX") { 76 - slog.Warn("migration: skipping extension index (not supported in this environment)", 77 - "migration", name, "err", err) 78 + if !extensionMigrationNoticeLogged { 79 + extensionMigrationNoticeLogged = true 80 + slog.Info("migration: skipping Turso extension indexes in this environment", 81 + "migration", name, 82 + "reason", "database engine does not support Turso-specific FTS/vector DDL", 83 + ) 84 + } 78 85 continue 79 86 } 80 87 return fmt.Errorf("migration %s: exec failed: %w\nstatement: %s", name, err, stmt)
+73
packages/api/internal/store/sql_store.go
··· 178 178 return nil 179 179 } 180 180 181 + func (s *SQLStore) GetFollowSubjects(ctx context.Context, did string) ([]string, error) { 182 + rows, err := s.db.QueryContext(ctx, ` 183 + SELECT DISTINCT repo_did 184 + FROM documents 185 + WHERE did = ? 186 + AND collection = 'sh.tangled.graph.follow' 187 + AND deleted_at IS NULL 188 + AND repo_did IS NOT NULL 189 + AND repo_did != ''`, 190 + did, 191 + ) 192 + if err != nil { 193 + return nil, fmt.Errorf("get follow subjects: %w", err) 194 + } 195 + defer rows.Close() 196 + 197 + var subjects []string 198 + for rows.Next() { 199 + var subject string 200 + if err := rows.Scan(&subject); err != nil { 201 + return nil, fmt.Errorf("scan follow subject: %w", err) 202 + } 203 + subjects = append(subjects, subject) 204 + } 205 + if err := rows.Err(); err != nil { 206 + return nil, fmt.Errorf("iterate follow subjects: %w", err) 207 + } 208 + return subjects, nil 209 + } 210 + 211 + func (s *SQLStore) GetRepoCollaborators(ctx context.Context, repoOwnerDID string) ([]string, error) { 212 + rows, err := s.db.QueryContext(ctx, ` 213 + SELECT DISTINCT did 214 + FROM documents 215 + WHERE repo_did = ? 216 + AND did != ? 217 + AND deleted_at IS NULL 218 + AND collection IN ( 219 + 'sh.tangled.repo.issue', 220 + 'sh.tangled.repo.pull', 221 + 'sh.tangled.repo.issue.comment', 222 + 'sh.tangled.repo.pull.comment' 223 + )`, 224 + repoOwnerDID, repoOwnerDID, 225 + ) 226 + if err != nil { 227 + return nil, fmt.Errorf("get repo collaborators: %w", err) 228 + } 229 + defer rows.Close() 230 + 231 + var collaborators []string 232 + for rows.Next() { 233 + var collaborator string 234 + if err := rows.Scan(&collaborator); err != nil { 235 + return nil, fmt.Errorf("scan collaborator: %w", err) 236 + } 237 + collaborators = append(collaborators, collaborator) 238 + } 239 + if err := rows.Err(); err != nil { 240 + return nil, fmt.Errorf("iterate collaborators: %w", err) 241 + } 242 + return collaborators, nil 243 + } 244 + 245 + func (s *SQLStore) CountDocuments(ctx context.Context) (int64, error) { 246 + var n int64 247 + err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM documents WHERE deleted_at IS NULL`).Scan(&n) 248 + if err != nil { 249 + return 0, fmt.Errorf("count documents: %w", err) 250 + } 251 + return n, nil 252 + } 253 + 181 254 func scanDocument(row *sql.Row) (*Document, error) { 182 255 doc := &Document{} 183 256 var (
+3
packages/api/internal/store/store.go
··· 51 51 UpsertIdentityHandle(ctx context.Context, did, handle string, isActive bool, status string) error 52 52 GetIdentityHandle(ctx context.Context, did string) (string, error) 53 53 EnqueueEmbeddingJob(ctx context.Context, documentID string) error 54 + GetFollowSubjects(ctx context.Context, did string) ([]string, error) 55 + GetRepoCollaborators(ctx context.Context, repoOwnerDID string) ([]string, error) 56 + CountDocuments(ctx context.Context) (int64, error) 54 57 }
+79
packages/api/internal/store/store_test.go
··· 236 236 t.Fatalf("last_error: got %q, want NULL", lastError.String) 237 237 } 238 238 }) 239 + 240 + t.Run("follow subject discovery query", func(t *testing.T) { 241 + followDoc := &store.Document{ 242 + ID: "did:plc:owner|sh.tangled.graph.follow|f1", 243 + DID: "did:plc:owner", 244 + Collection: "sh.tangled.graph.follow", 245 + RKey: "f1", 246 + ATURI: "at://did:plc:owner/sh.tangled.graph.follow/f1", 247 + CID: "cid-follow", 248 + RecordType: "follow", 249 + RepoDID: "did:plc:target", 250 + } 251 + if err := st.UpsertDocument(ctx, followDoc); err != nil { 252 + t.Fatalf("upsert follow doc: %v", err) 253 + } 254 + 255 + subjects, err := st.GetFollowSubjects(ctx, "did:plc:owner") 256 + if err != nil { 257 + t.Fatalf("get follow subjects: %v", err) 258 + } 259 + if len(subjects) != 1 || subjects[0] != "did:plc:target" { 260 + t.Fatalf("subjects: got %#v", subjects) 261 + } 262 + }) 263 + 264 + t.Run("repo collaborator discovery query", func(t *testing.T) { 265 + docs := []*store.Document{ 266 + { 267 + ID: "did:plc:collab1|sh.tangled.repo.issue|i1", 268 + DID: "did:plc:collab1", 269 + Collection: "sh.tangled.repo.issue", 270 + RKey: "i1", 271 + ATURI: "at://did:plc:collab1/sh.tangled.repo.issue/i1", 272 + CID: "cid-c1", 273 + RecordType: "issue", 274 + RepoDID: "did:plc:owner", 275 + }, 276 + { 277 + ID: "did:plc:collab2|sh.tangled.repo.pull.comment|pc1", 278 + DID: "did:plc:collab2", 279 + Collection: "sh.tangled.repo.pull.comment", 280 + RKey: "pc1", 281 + ATURI: "at://did:plc:collab2/sh.tangled.repo.pull.comment/pc1", 282 + CID: "cid-c2", 283 + RecordType: "pull_comment", 284 + RepoDID: "did:plc:owner", 285 + }, 286 + { 287 + ID: "did:plc:owner|sh.tangled.repo.issue|i-owner", 288 + DID: "did:plc:owner", 289 + Collection: "sh.tangled.repo.issue", 290 + RKey: "i-owner", 291 + ATURI: "at://did:plc:owner/sh.tangled.repo.issue/i-owner", 292 + CID: "cid-owner", 293 + RecordType: "issue", 294 + RepoDID: "did:plc:owner", 295 + }, 296 + } 297 + for _, doc := range docs { 298 + if err := st.UpsertDocument(ctx, doc); err != nil { 299 + t.Fatalf("upsert collaborator doc %s: %v", doc.ID, err) 300 + } 301 + } 302 + 303 + collaborators, err := st.GetRepoCollaborators(ctx, "did:plc:owner") 304 + if err != nil { 305 + t.Fatalf("get collaborators: %v", err) 306 + } 307 + if len(collaborators) != 2 { 308 + t.Fatalf("collaborators length: got %d want 2 (%#v)", len(collaborators), collaborators) 309 + } 310 + got := map[string]bool{} 311 + for _, did := range collaborators { 312 + got[did] = true 313 + } 314 + if !got["did:plc:collab1"] || !got["did:plc:collab2"] { 315 + t.Fatalf("collaborators: got %#v", collaborators) 316 + } 317 + }) 239 318 }
+43
packages/api/internal/tapclient/tapclient.go
··· 9 9 "math/rand/v2" 10 10 "net/http" 11 11 "strconv" 12 + "strings" 12 13 "sync" 13 14 "time" 14 15 ··· 19 20 const ( 20 21 minReconnectBackoff = 500 * time.Millisecond 21 22 maxReconnectBackoff = 10 * time.Second 23 + keepAliveInterval = 20 * time.Second 24 + keepAliveTimeout = 5 * time.Second 22 25 ) 23 26 24 27 // Client receives Tap events over WebSocket and sends acks after processing. ··· 85 88 payload, _ := json.Marshal(map[string]int64{"id": id}) 86 89 if err := conn.Write(ctx, websocket.MessageText, payload); err == nil { 87 90 return nil 91 + } else if isConnectionWriteError(err) { 92 + c.resetConn(websocket.StatusInternalError, "ack json write failed") 93 + return fmt.Errorf("ack event %d: %w", id, err) 88 94 } 89 95 90 96 c.log.Warn("tap ack json failed; trying plain id", slog.Int64("event_id", id)) ··· 144 150 c.mu.Lock() 145 151 if c.conn == nil { 146 152 c.conn = conn 153 + c.startKeepAlive(conn) 147 154 } else { 148 155 _ = conn.Close(websocket.StatusNormalClosure, "duplicate") 149 156 } ··· 179 186 _ = c.conn.Close(status, reason) 180 187 c.conn = nil 181 188 } 189 + 190 + func (c *Client) startKeepAlive(conn *websocket.Conn) { 191 + go func() { 192 + ticker := time.NewTicker(keepAliveInterval) 193 + defer ticker.Stop() 194 + 195 + for range ticker.C { 196 + c.mu.Lock() 197 + if c.conn != conn { 198 + c.mu.Unlock() 199 + return 200 + } 201 + c.mu.Unlock() 202 + 203 + ctx, cancel := context.WithTimeout(context.Background(), keepAliveTimeout) 204 + err := conn.Ping(ctx) 205 + cancel() 206 + if err != nil { 207 + c.log.Warn("tap keepalive ping failed", slog.String("error", err.Error())) 208 + c.resetConn(websocket.StatusInternalError, "keepalive failed") 209 + return 210 + } 211 + } 212 + }() 213 + } 214 + 215 + func isConnectionWriteError(err error) bool { 216 + if err == nil { 217 + return false 218 + } 219 + msg := strings.ToLower(err.Error()) 220 + return strings.Contains(msg, "broken pipe") || 221 + strings.Contains(msg, "connection reset") || 222 + strings.Contains(msg, "closed network connection") || 223 + strings.Contains(msg, "i/o timeout") 224 + }
+72 -3
packages/api/main.go
··· 7 7 "os" 8 8 "os/signal" 9 9 "syscall" 10 + "time" 10 11 11 12 "github.com/spf13/cobra" 13 + "tangled.org/desertthunder.dev/twister/internal/backfill" 12 14 "tangled.org/desertthunder.dev/twister/internal/config" 13 15 "tangled.org/desertthunder.dev/twister/internal/ingest" 14 16 "tangled.org/desertthunder.dev/twister/internal/normalize" ··· 24 26 25 27 func main() { 26 28 root := &cobra.Command{ 27 - Use: "twister", 28 - Short: "Tangled search service", 29 - Version: fmt.Sprintf("%s (%s)", version, commit), 29 + Use: "twister", 30 + Short: "Tangled search service", 31 + Version: fmt.Sprintf("%s (%s)", version, commit), 32 + SilenceUsage: true, 33 + SilenceErrors: true, 30 34 } 31 35 32 36 root.AddCommand( 33 37 newAPICmd(), 34 38 newIndexerCmd(), 39 + newBackfillCmd(), 35 40 newEmbedWorkerCmd(), 36 41 newReindexCmd(), 37 42 newReembedCmd(), ··· 39 44 ) 40 45 41 46 if err := root.Execute(); err != nil { 47 + _, _ = fmt.Fprintln(os.Stderr, "Error:", err) 42 48 os.Exit(1) 43 49 } 44 50 } ··· 136 142 return nil 137 143 }, 138 144 } 145 + } 146 + 147 + func newBackfillCmd() *cobra.Command { 148 + var opts backfill.Options 149 + 150 + cmd := &cobra.Command{ 151 + Use: "backfill", 152 + Short: "Discover users from seeds and register repos for Tap backfill", 153 + RunE: func(cmd *cobra.Command, args []string) error { 154 + cfg, err := config.Load() 155 + if err != nil { 156 + return fmt.Errorf("config: %w", err) 157 + } 158 + log := observability.NewLogger(cfg) 159 + log.Info("starting backfill", slog.String("service", "backfill"), slog.String("version", version)) 160 + 161 + if cfg.TapURL == "" { 162 + return fmt.Errorf("TAP_URL is required for backfill") 163 + } 164 + 165 + db, err := store.Open(cfg.TursoURL, cfg.TursoToken) 166 + if err != nil { 167 + return fmt.Errorf("open database: %w", err) 168 + } 169 + defer db.Close() 170 + 171 + if err := store.Migrate(db); err != nil { 172 + return fmt.Errorf("migrate database: %w", err) 173 + } 174 + 175 + tapAdmin, err := backfill.NewHTTPTapAdmin(cfg.TapURL, cfg.TapAuthPassword) 176 + if err != nil { 177 + return fmt.Errorf("tap admin client: %w", err) 178 + } 179 + 180 + runner := backfill.NewRunner( 181 + store.New(db), 182 + tapAdmin, 183 + backfill.NewHTTPHandleResolver(""), 184 + log, 185 + ) 186 + 187 + ctx, cancel := baseContext() 188 + defer cancel() 189 + 190 + if err := runner.Run(ctx, opts); err != nil { 191 + return fmt.Errorf("run backfill: %w", err) 192 + } 193 + 194 + log.Info("shutting down backfill") 195 + return nil 196 + }, 197 + } 198 + 199 + cmd.Flags().StringVar(&opts.SeedsPath, "seeds", "", "Seed source: file path or comma-separated DIDs/handles (required)") 200 + cmd.Flags().IntVar(&opts.MaxHops, "max-hops", 2, "Max fan-out depth from seeds") 201 + cmd.Flags().BoolVar(&opts.DryRun, "dry-run", false, "Print discovery plan without mutating Tap") 202 + cmd.Flags().IntVar(&opts.Concurrency, "concurrency", 5, "Parallel discovery workers") 203 + cmd.Flags().IntVar(&opts.BatchSize, "batch-size", 10, "DIDs per /repos/add request") 204 + cmd.Flags().DurationVar(&opts.BatchDelay, "batch-delay", time.Second, "Delay between Tap /repos/add batches") 205 + _ = cmd.MarkFlagRequired("seeds") 206 + 207 + return cmd 139 208 } 140 209 141 210 func newReindexCmd() *cobra.Command {