a love letter to tangled (android, iOS, and a search API)
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add profile & repo indexing in Lightrail runner

+312 -59
+68 -56
packages/api/internal/backfill/backfill.go
··· 26 26 resolver handleResolver 27 27 follows followFetcher 28 28 profiles profileFetcher 29 + repos repoFetcher 29 30 lightrail lightrailRepoLister 30 31 log *slog.Logger 31 32 } ··· 36 37 NewXRPCHandleResolver(xrpcClient), 37 38 NewXRPCFollowFetcher(xrpcClient), 38 39 NewXRPCProfileFetcher(xrpcClient), 40 + NewXRPCRepoFetcher(xrpcClient), 39 41 NewHTTPLightrailClient(), 40 42 log, 41 43 ) ··· 43 45 44 46 func NewRunnerWithDeps( 45 47 store discoveryStore, tap tapAdmin, resolver handleResolver, 46 - follows followFetcher, profiles profileFetcher, lightrail lightrailRepoLister, 48 + follows followFetcher, profiles profileFetcher, repos repoFetcher, lightrail lightrailRepoLister, 47 49 log *slog.Logger, 48 50 ) *Runner { 49 51 if log == nil { ··· 54 56 } 55 57 return &Runner{ 56 58 store: store, tap: tap, resolver: resolver, follows: follows, 57 - profiles: profiles, lightrail: lightrail, log: log, 59 + profiles: profiles, repos: repos, lightrail: lightrail, log: log, 58 60 } 59 61 } 60 62 ··· 167 169 slog.Int("submit_failures", submitFailures), 168 170 ) 169 171 170 - if err := r.indexProfiles(ctx, discovered, seedHandles, opts.Concurrency); err != nil { 171 - return fmt.Errorf("index profiles: %w", err) 172 + if err := r.bootstrapProfilesAndRepos(ctx, discovered, seedHandles, opts.Concurrency); err != nil { 173 + return fmt.Errorf("bootstrap profiles and repos: %w", err) 172 174 } 173 175 174 176 return nil ··· 218 220 slog.Int("submitted", submitted), 219 221 slog.Int("submit_failures", submitFailures), 220 222 ) 223 + 224 + if err := r.bootstrapProfilesAndRepos(ctx, discovered, nil, opts.Concurrency); err != nil { 225 + return fmt.Errorf("bootstrap profiles and repos: %w", err) 226 + } 227 + 221 228 return nil 222 229 } 223 230 ··· 448 455 return normalized 449 456 } 450 457 451 - // indexProfiles fetches sh.tangled.actor.profile records via XRPC for each 452 - // discovered user, persists the DID→handle mapping, and upserts a searchable 453 - // profile document. 454 - func (r *Runner) indexProfiles(ctx context.Context, users []DiscoveredUser, seedHandles map[string]string, concurrency int) error { 458 + // bootstrapProfilesAndRepos fetches actor profiles and repo records via XRPC 459 + // for each discovered user, persists DID→handle mappings, and upserts 460 + // searchable bootstrap documents. 461 + func (r *Runner) bootstrapProfilesAndRepos(ctx context.Context, users []DiscoveredUser, seedHandles map[string]string, concurrency int) error { 455 462 if concurrency <= 0 { 456 463 concurrency = 5 457 464 } 458 465 459 466 type result struct { 460 - did string 461 - profile *ProfileRecord 462 - err error 467 + did string 468 + profile *ProfileRecord 469 + repos []RepoRecord 470 + profileErr error 471 + repoErr error 463 472 } 464 473 465 474 jobs := make(chan string) ··· 470 479 go func() { 471 480 defer wg.Done() 472 481 for did := range jobs { 473 - pr, err := r.profiles.FetchProfile(ctx, did) 474 - results <- result{did: did, profile: pr, err: err} 482 + res := result{did: did} 483 + if r.profiles != nil { 484 + res.profile, res.profileErr = r.profiles.FetchProfile(ctx, did) 485 + } 486 + if r.repos != nil { 487 + res.repos, res.repoErr = r.repos.ListRepos(ctx, did) 488 + } 489 + results <- res 475 490 } 476 491 }() 477 492 } ··· 485 500 close(results) 486 501 }() 487 502 488 - indexed := 0 503 + profilesIndexed := 0 504 + reposIndexed := 0 489 505 identities := 0 490 506 failures := 0 491 507 for res := range results { 492 - if res.err != nil { 508 + if res.profileErr != nil { 493 509 failures++ 494 510 r.log.Warn("profile fetch failed", 495 511 slog.String("did", res.did), 496 - slog.String("error", res.err.Error()), 512 + slog.String("error", res.profileErr.Error()), 497 513 ) 498 - continue 514 + } 515 + if res.repoErr != nil { 516 + failures++ 517 + r.log.Warn("repo list failed", 518 + slog.String("did", res.did), 519 + slog.String("error", res.repoErr.Error()), 520 + ) 499 521 } 500 522 501 - handle := res.profile.Handle 523 + handle := "" 524 + if res.profile != nil { 525 + handle = res.profile.Handle 526 + } 502 527 if h, ok := seedHandles[res.did]; ok && h != "" { 503 528 handle = h 504 529 } ··· 515 540 } 516 541 } 517 542 518 - if res.profile.Record == nil { 519 - continue 520 - } 521 - 522 - description, _ := res.profile.Record["description"].(string) 523 - location, _ := res.profile.Record["location"].(string) 524 - summary := description 525 - if location != "" { 526 - if summary != "" { 527 - summary = summary + " · " + location 543 + doc := bootstrapProfileDocument(res.did, res.profile, handle) 544 + if doc != nil { 545 + if err := r.store.UpsertDocument(ctx, doc); err != nil { 546 + r.log.Warn("upsert profile document failed", 547 + slog.String("did", res.did), 548 + slog.String("error", err.Error()), 549 + ) 528 550 } else { 529 - summary = location 551 + profilesIndexed++ 530 552 } 531 553 } 532 - if len(summary) > 200 { 533 - summary = summary[:200] 534 - } 535 554 536 - doc := &store.Document{ 537 - ID: fmt.Sprintf("%s|%s|self", res.did, profileCollection), 538 - DID: res.did, 539 - Collection: profileCollection, 540 - RKey: "self", 541 - ATURI: fmt.Sprintf("at://%s/%s/self", res.did, profileCollection), 542 - CID: res.profile.CID, 543 - RecordType: "profile", 544 - Title: handle, 545 - Body: description, 546 - Summary: summary, 547 - AuthorHandle: handle, 548 - TagsJSON: "[]", 555 + for _, repo := range res.repos { 556 + doc := bootstrapRepoDocument(res.did, handle, repo) 557 + if doc == nil { 558 + continue 559 + } 560 + if err := r.store.UpsertDocument(ctx, doc); err != nil { 561 + r.log.Warn("upsert repo document failed", 562 + slog.String("did", res.did), 563 + slog.String("rkey", repo.RKey), 564 + slog.String("error", err.Error()), 565 + ) 566 + continue 567 + } 568 + reposIndexed++ 549 569 } 550 - 551 - if err := r.store.UpsertDocument(ctx, doc); err != nil { 552 - r.log.Warn("upsert profile document failed", 553 - slog.String("did", res.did), 554 - slog.String("error", err.Error()), 555 - ) 556 - continue 557 - } 558 - indexed++ 559 570 } 560 571 561 - r.log.Info("profile indexing complete", 572 + r.log.Info("bootstrap indexing complete", 562 573 slog.Int("identities_stored", identities), 563 - slog.Int("profiles_indexed", indexed), 574 + slog.Int("profiles_indexed", profilesIndexed), 575 + slog.Int("repos_indexed", reposIndexed), 564 576 slog.Int("failures", failures), 565 577 ) 566 578 return nil
+136 -3
packages/api/internal/backfill/backfill_test.go
··· 95 95 return &ProfileRecord{}, nil 96 96 } 97 97 98 + type fakeRepoFetcher struct { 99 + repos map[string][]RepoRecord 100 + } 101 + 102 + func (f *fakeRepoFetcher) ListRepos(_ context.Context, did string) ([]RepoRecord, error) { 103 + if repos, ok := f.repos[did]; ok { 104 + return repos, nil 105 + } 106 + return nil, nil 107 + } 108 + 98 109 type fakeLightrailRepoLister struct { 99 110 dids []string 100 111 err error ··· 130 141 r := NewRunnerWithDeps( 131 142 st, tap, resolver, follows, 132 143 &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, 144 + &fakeRepoFetcher{}, 133 145 &fakeLightrailRepoLister{}, log, 134 146 ) 135 147 ··· 167 179 r := NewRunnerWithDeps( 168 180 st, tap, resolver, follows, 169 181 &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, 182 + &fakeRepoFetcher{}, 170 183 &fakeLightrailRepoLister{}, log, 171 184 ) 172 185 ··· 203 216 r := NewRunnerWithDeps( 204 217 st, tap, resolver, follows, 205 218 &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, 219 + &fakeRepoFetcher{}, 206 220 &fakeLightrailRepoLister{}, log, 207 221 ) 208 222 ··· 239 253 r := NewRunnerWithDeps( 240 254 st, tap, resolver, follows, 241 255 &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, 256 + &fakeRepoFetcher{}, 242 257 &fakeLightrailRepoLister{}, log, 243 258 ) 244 259 ··· 296 311 r := NewRunnerWithDeps( 297 312 st, tap, resolver, follows, 298 313 &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, 314 + &fakeRepoFetcher{}, 299 315 &fakeLightrailRepoLister{}, log, 300 316 ) 301 317 ··· 346 362 }} 347 363 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 348 364 r := NewRunnerWithDeps( 349 - st, tap, resolver, follows, profiles, &fakeLightrailRepoLister{}, log, 365 + st, tap, resolver, follows, profiles, &fakeRepoFetcher{}, &fakeLightrailRepoLister{}, log, 350 366 ) 351 367 352 368 dir := t.TempDir() ··· 395 411 lightrail := &fakeLightrailRepoLister{dids: []string{"did:plc:b", "did:plc:a"}} 396 412 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 397 413 r := NewRunnerWithDeps( 398 - st, tap, &fakeResolver{}, &fakeFollowFetcher{}, &fakeProfileFetcher{}, 414 + st, tap, &fakeResolver{}, &fakeFollowFetcher{}, &fakeProfileFetcher{}, &fakeRepoFetcher{}, 399 415 lightrail, log, 400 416 ) 401 417 ··· 432 448 } 433 449 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 434 450 r := NewRunnerWithDeps( 435 - st, tap, &fakeResolver{}, &fakeFollowFetcher{}, &fakeProfileFetcher{}, 451 + st, tap, &fakeResolver{}, &fakeFollowFetcher{}, &fakeProfileFetcher{}, &fakeRepoFetcher{}, 436 452 lightrail, log, 437 453 ) 438 454 ··· 450 466 t.Fatalf("expected deduped DIDs, got %#v", tap.added) 451 467 } 452 468 } 469 + 470 + func TestRunner_LightrailIndexesProfiles(t *testing.T) { 471 + st := &fakeStore{collaborators: map[string][]string{}} 472 + tap := &fakeTapAdmin{} 473 + lightrail := &fakeLightrailRepoLister{ 474 + dids: []string{"did:plc:xg2vq45muivyy3xwatcehspu"}, 475 + } 476 + profiles := &fakeProfileFetcher{profiles: map[string]*ProfileRecord{ 477 + "did:plc:xg2vq45muivyy3xwatcehspu": { 478 + Record: map[string]any{ 479 + "description": "Twisted maintainer", 480 + "location": "Chicago", 481 + }, 482 + CID: "bafydesert123", 483 + Handle: "desertthunder.dev", 484 + }, 485 + }} 486 + log := slog.New(slog.NewTextHandler(io.Discard, nil)) 487 + r := NewRunnerWithDeps( 488 + st, tap, &fakeResolver{}, &fakeFollowFetcher{}, profiles, &fakeRepoFetcher{}, lightrail, log, 489 + ) 490 + 491 + err := r.Run(context.Background(), Options{ 492 + Source: SourceLightrail, 493 + BatchSize: 10, 494 + Concurrency: 1, 495 + Collections: []string{"sh.tangled.repo"}, 496 + }) 497 + if err != nil { 498 + t.Fatalf("run lightrail backfill: %v", err) 499 + } 500 + if st.identities["did:plc:xg2vq45muivyy3xwatcehspu"] != "desertthunder.dev" { 501 + t.Fatalf("expected identity handle to be stored, got %#v", st.identities) 502 + } 503 + if len(st.documents) != 1 { 504 + t.Fatalf("expected one profile document, got %d", len(st.documents)) 505 + } 506 + doc := st.documents[0] 507 + if doc.RecordType != "profile" { 508 + t.Fatalf("expected profile document, got %q", doc.RecordType) 509 + } 510 + if doc.AuthorHandle != "desertthunder.dev" { 511 + t.Fatalf("expected author_handle desertthunder.dev, got %q", doc.AuthorHandle) 512 + } 513 + if doc.Title != "desertthunder.dev" { 514 + t.Fatalf("expected title desertthunder.dev, got %q", doc.Title) 515 + } 516 + } 517 + 518 + func TestRunner_LightrailIndexesReposDirectly(t *testing.T) { 519 + st := &fakeStore{collaborators: map[string][]string{}} 520 + tap := &fakeTapAdmin{} 521 + lightrail := &fakeLightrailRepoLister{ 522 + dids: []string{"did:plc:xg2vq45muivyy3xwatcehspu"}, 523 + } 524 + profiles := &fakeProfileFetcher{profiles: map[string]*ProfileRecord{ 525 + "did:plc:xg2vq45muivyy3xwatcehspu": { 526 + Handle: "desertthunder.dev", 527 + Record: map[string]any{ 528 + "description": "Twisted maintainer", 529 + }, 530 + CID: "bafydesert123", 531 + }, 532 + }} 533 + repos := &fakeRepoFetcher{repos: map[string][]RepoRecord{ 534 + "did:plc:xg2vq45muivyy3xwatcehspu": { 535 + { 536 + RKey: "3mho6hukiei22", 537 + CID: "bafyreitwisted123", 538 + Record: map[string]any{ 539 + "name": "twisted", 540 + "description": "A tangled mobile client", 541 + "topics": []any{"go", "search"}, 542 + "createdAt": "2026-03-01T00:00:00Z", 543 + }, 544 + }, 545 + }, 546 + }} 547 + log := slog.New(slog.NewTextHandler(io.Discard, nil)) 548 + r := NewRunnerWithDeps( 549 + st, tap, &fakeResolver{}, &fakeFollowFetcher{}, profiles, repos, lightrail, log, 550 + ) 551 + 552 + err := r.Run(context.Background(), Options{ 553 + Source: SourceLightrail, 554 + BatchSize: 10, 555 + Concurrency: 1, 556 + Collections: []string{"sh.tangled.repo"}, 557 + }) 558 + if err != nil { 559 + t.Fatalf("run lightrail backfill: %v", err) 560 + } 561 + 562 + if len(st.documents) != 2 { 563 + t.Fatalf("expected profile and repo bootstrap documents, got %d", len(st.documents)) 564 + } 565 + 566 + var foundRepo *store.Document 567 + for _, doc := range st.documents { 568 + if doc.RecordType == "repo" { 569 + foundRepo = doc 570 + break 571 + } 572 + } 573 + if foundRepo == nil { 574 + t.Fatal("expected repo bootstrap document") 575 + } 576 + if foundRepo.Title != "twisted" { 577 + t.Fatalf("expected repo title twisted, got %q", foundRepo.Title) 578 + } 579 + if foundRepo.AuthorHandle != "desertthunder.dev" { 580 + t.Fatalf("expected repo author_handle desertthunder.dev, got %q", foundRepo.AuthorHandle) 581 + } 582 + if foundRepo.WebURL == "" { 583 + t.Fatal("expected repo web_url to be populated") 584 + } 585 + }
+108
packages/api/internal/backfill/profile.go
··· 5 5 "errors" 6 6 "fmt" 7 7 8 + "tangled.org/desertthunder.dev/twister/internal/normalize" 9 + "tangled.org/desertthunder.dev/twister/internal/store" 8 10 "tangled.org/desertthunder.dev/twister/internal/xrpc" 9 11 ) 10 12 11 13 const profileCollection = "sh.tangled.actor.profile" 14 + const repoCollection = "sh.tangled.repo" 12 15 13 16 // ProfileRecord holds the fetched profile data and resolved handle. 14 17 type ProfileRecord struct { ··· 19 22 20 23 type profileFetcher interface { 21 24 FetchProfile(ctx context.Context, did string) (*ProfileRecord, error) 25 + } 26 + 27 + type RepoRecord struct { 28 + RKey string 29 + CID string 30 + Record map[string]any 31 + } 32 + 33 + type repoFetcher interface { 34 + ListRepos(ctx context.Context, did string) ([]RepoRecord, error) 22 35 } 23 36 24 37 // XRPCProfileFetcher fetches sh.tangled.actor.profile records via xrpc.Client ··· 52 65 Handle: info.Handle, 53 66 }, nil 54 67 } 68 + 69 + type XRPCRepoFetcher struct { 70 + client *xrpc.Client 71 + } 72 + 73 + func NewXRPCRepoFetcher(client *xrpc.Client) *XRPCRepoFetcher { 74 + return &XRPCRepoFetcher{client: client} 75 + } 76 + 77 + func (f *XRPCRepoFetcher) ListRepos(ctx context.Context, did string) ([]RepoRecord, error) { 78 + info, err := f.client.ResolveIdentity(ctx, did) 79 + if err != nil { 80 + return nil, fmt.Errorf("resolve identity: %w", err) 81 + } 82 + 83 + records, err := f.client.ListAllRecords(ctx, info.PDS, did, repoCollection) 84 + if err != nil { 85 + return nil, fmt.Errorf("list repos: %w", err) 86 + } 87 + 88 + repos := make([]RepoRecord, 0, len(records)) 89 + for _, rec := range records { 90 + _, _, rkey, err := normalize.ParseATURI(rec.URI) 91 + if err != nil { 92 + continue 93 + } 94 + repos = append(repos, RepoRecord{ 95 + RKey: rkey, 96 + CID: rec.CID, 97 + Record: rec.Value, 98 + }) 99 + } 100 + return repos, nil 101 + } 102 + 103 + func bootstrapProfileDocument(did string, profile *ProfileRecord, handle string) *store.Document { 104 + if profile == nil || profile.Record == nil { 105 + return nil 106 + } 107 + 108 + description, _ := profile.Record["description"].(string) 109 + location, _ := profile.Record["location"].(string) 110 + summary := description 111 + if location != "" { 112 + if summary != "" { 113 + summary = summary + " · " + location 114 + } else { 115 + summary = location 116 + } 117 + } 118 + if len(summary) > 200 { 119 + summary = summary[:200] 120 + } 121 + 122 + return &store.Document{ 123 + ID: fmt.Sprintf("%s|%s|self", did, profileCollection), 124 + DID: did, 125 + Collection: profileCollection, 126 + RKey: "self", 127 + ATURI: fmt.Sprintf("at://%s/%s/self", did, profileCollection), 128 + CID: profile.CID, 129 + RecordType: "profile", 130 + Title: handle, 131 + Body: description, 132 + Summary: summary, 133 + AuthorHandle: handle, 134 + TagsJSON: "[]", 135 + } 136 + } 137 + 138 + func bootstrapRepoDocument(did, handle string, repo RepoRecord) *store.Document { 139 + adapter := &normalize.RepoAdapter{} 140 + if !adapter.Searchable(repo.Record) { 141 + return nil 142 + } 143 + 144 + event := normalize.TapRecordEvent{ 145 + Type: "record", 146 + Record: &normalize.TapRecord{ 147 + DID: did, 148 + Collection: repoCollection, 149 + RKey: repo.RKey, 150 + CID: repo.CID, 151 + Record: repo.Record, 152 + }, 153 + } 154 + 155 + doc, err := adapter.Normalize(event) 156 + if err != nil { 157 + return nil 158 + } 159 + doc.AuthorHandle = handle 160 + doc.WebURL = xrpc.BuildWebURL(handle, doc.RepoName, doc.RecordType, doc.RKey) 161 + return doc 162 + }