a love letter to tangled (android, iOS, and a search API)
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: backfill profile records at atprotocol level

+379 -12
+144 -7
packages/api/internal/backfill/backfill.go
··· 7 7 "sort" 8 8 "sync" 9 9 "time" 10 + 11 + "tangled.org/desertthunder.dev/twister/internal/store" 10 12 ) 11 13 12 14 type discoveryStore interface { 13 15 GetRepoCollaborators(ctx context.Context, repoOwnerDID string) ([]string, error) 16 + UpsertIdentityHandle(ctx context.Context, did, handle string, isActive bool, status string) error 17 + UpsertDocument(ctx context.Context, doc *store.Document) error 14 18 } 15 19 16 20 // Runner executes seed resolution, graph discovery, and Tap registration. ··· 19 23 tap tapAdmin 20 24 resolver handleResolver 21 25 follows followFetcher 26 + profiles profileFetcher 22 27 log *slog.Logger 23 28 } 24 29 25 30 func NewRunner(store discoveryStore, tap tapAdmin, resolver handleResolver, log *slog.Logger) *Runner { 26 - return NewRunnerWithDeps(store, tap, resolver, NewHTTPFollowFetcher(), log) 31 + return NewRunnerWithDeps(store, tap, resolver, NewHTTPFollowFetcher(), NewHTTPProfileFetcher(), log) 27 32 } 28 33 29 - func NewRunnerWithDeps(store discoveryStore, tap tapAdmin, resolver handleResolver, follows followFetcher, log *slog.Logger) *Runner { 34 + func NewRunnerWithDeps(store discoveryStore, tap tapAdmin, resolver handleResolver, follows followFetcher, profiles profileFetcher, log *slog.Logger) *Runner { 30 35 if log == nil { 31 36 log = slog.Default() 32 37 } 33 38 if follows == nil { 34 39 follows = NewHTTPFollowFetcher() 35 40 } 36 - return &Runner{store: store, tap: tap, resolver: resolver, follows: follows, log: log} 41 + if profiles == nil { 42 + profiles = NewHTTPProfileFetcher() 43 + } 44 + return &Runner{store: store, tap: tap, resolver: resolver, follows: follows, profiles: profiles, log: log} 37 45 } 38 46 39 47 func (r *Runner) Run(ctx context.Context, opts Options) error { ··· 57 65 if err != nil { 58 66 return err 59 67 } 60 - seeds, err := r.resolveSeeds(ctx, seedEntries) 68 + seeds, seedHandles, err := r.resolveSeeds(ctx, seedEntries) 61 69 if err != nil { 62 70 return err 63 71 } ··· 167 175 slog.Int("status_failures", statusFailures), 168 176 slog.Int("submit_failures", submitFailures), 169 177 ) 178 + 179 + if err := r.indexProfiles(ctx, discovered, seedHandles, opts.Concurrency); err != nil { 180 + return fmt.Errorf("index profiles: %w", err) 181 + } 182 + 170 183 return nil 171 184 } 172 185 173 - func (r *Runner) resolveSeeds(ctx context.Context, entries []seedEntry) ([]string, error) { 186 + // resolveSeeds returns (dids, did→handle map, error). The handle map contains 187 + // entries for seeds that were specified as handles rather than DIDs. 188 + func (r *Runner) resolveSeeds(ctx context.Context, entries []seedEntry) ([]string, map[string]string, error) { 174 189 seen := map[string]bool{} 175 190 seeds := make([]string, 0, len(entries)) 191 + handles := make(map[string]string) // did → handle 176 192 for _, entry := range entries { 177 193 if entry.isDID { 178 194 seen[entry.raw] = true ··· 181 197 } 182 198 did, err := r.resolver.Resolve(ctx, entry.raw) 183 199 if err != nil { 184 - return nil, fmt.Errorf("resolve handle at line %d (%s): %w", entry.lineNo, entry.raw, err) 200 + return nil, nil, fmt.Errorf("resolve handle at line %d (%s): %w", entry.lineNo, entry.raw, err) 185 201 } 186 202 if seen[did] { 187 203 continue 188 204 } 189 205 seen[did] = true 190 206 seeds = append(seeds, did) 207 + handles[did] = entry.raw 191 208 } 192 - return seeds, nil 209 + return seeds, handles, nil 193 210 } 194 211 195 212 func (r *Runner) discover(ctx context.Context, seeds []string, maxHops int, concurrency int) ([]DiscoveredUser, error) { ··· 300 317 301 318 return ordered, nil 302 319 } 320 + 321 + // indexProfiles fetches sh.tangled.actor.profile records via XRPC for each 322 + // discovered user, persists the DID→handle mapping, and upserts a searchable 323 + // profile document. 324 + func (r *Runner) indexProfiles(ctx context.Context, users []DiscoveredUser, seedHandles map[string]string, concurrency int) error { 325 + if concurrency <= 0 { 326 + concurrency = 5 327 + } 328 + 329 + type result struct { 330 + did string 331 + profile *ProfileRecord 332 + err error 333 + } 334 + 335 + jobs := make(chan string) 336 + results := make(chan result, len(users)) 337 + var wg sync.WaitGroup 338 + for i := 0; i < concurrency; i++ { 339 + wg.Add(1) 340 + go func() { 341 + defer wg.Done() 342 + for did := range jobs { 343 + pr, err := r.profiles.FetchProfile(ctx, did) 344 + results <- result{did: did, profile: pr, err: err} 345 + } 346 + }() 347 + } 348 + 349 + go func() { 350 + for _, u := range users { 351 + jobs <- u.DID 352 + } 353 + close(jobs) 354 + wg.Wait() 355 + close(results) 356 + }() 357 + 358 + indexed := 0 359 + identities := 0 360 + failures := 0 361 + for res := range results { 362 + if res.err != nil { 363 + failures++ 364 + r.log.Warn("profile fetch failed", 365 + slog.String("did", res.did), 366 + slog.String("error", res.err.Error()), 367 + ) 368 + continue 369 + } 370 + 371 + handle := res.profile.Handle 372 + // Prefer the seed handle if the user was specified by handle in seeds. 373 + if h, ok := seedHandles[res.did]; ok && h != "" { 374 + handle = h 375 + } 376 + 377 + if handle != "" { 378 + if err := r.store.UpsertIdentityHandle(ctx, res.did, handle, true, "active"); err != nil { 379 + r.log.Warn("upsert identity handle failed", 380 + slog.String("did", res.did), 381 + slog.String("handle", handle), 382 + slog.String("error", err.Error()), 383 + ) 384 + } else { 385 + identities++ 386 + } 387 + } 388 + 389 + // Only create a document if we got a profile record back. 390 + if res.profile.Record == nil { 391 + continue 392 + } 393 + 394 + description, _ := res.profile.Record["description"].(string) 395 + location, _ := res.profile.Record["location"].(string) 396 + summary := description 397 + if location != "" { 398 + if summary != "" { 399 + summary = summary + " · " + location 400 + } else { 401 + summary = location 402 + } 403 + } 404 + if len(summary) > 200 { 405 + summary = summary[:200] 406 + } 407 + 408 + doc := &store.Document{ 409 + ID: fmt.Sprintf("%s|%s|self", res.did, profileCollection), 410 + DID: res.did, 411 + Collection: profileCollection, 412 + RKey: "self", 413 + ATURI: fmt.Sprintf("at://%s/%s/self", res.did, profileCollection), 414 + CID: res.profile.CID, 415 + RecordType: "profile", 416 + Title: handle, 417 + Body: description, 418 + Summary: summary, 419 + AuthorHandle: handle, 420 + TagsJSON: "[]", 421 + } 422 + 423 + if err := r.store.UpsertDocument(ctx, doc); err != nil { 424 + r.log.Warn("upsert profile document failed", 425 + slog.String("did", res.did), 426 + slog.String("error", err.Error()), 427 + ) 428 + continue 429 + } 430 + indexed++ 431 + } 432 + 433 + r.log.Info("profile indexing complete", 434 + slog.Int("identities_stored", identities), 435 + slog.Int("profiles_indexed", indexed), 436 + slog.Int("failures", failures), 437 + ) 438 + return nil 439 + }
+89 -5
packages/api/internal/backfill/backfill_test.go
··· 9 9 "path/filepath" 10 10 "strings" 11 11 "testing" 12 + 13 + "tangled.org/desertthunder.dev/twister/internal/store" 12 14 ) 13 15 14 16 type fakeStore struct { 15 17 collaborators map[string][]string 18 + identities map[string]string 19 + documents []*store.Document 16 20 } 17 21 18 22 func (f *fakeStore) GetRepoCollaborators(_ context.Context, did string) ([]string, error) { 19 23 return f.collaborators[did], nil 24 + } 25 + 26 + func (f *fakeStore) UpsertIdentityHandle(_ context.Context, did, handle string, _ bool, _ string) error { 27 + if f.identities == nil { 28 + f.identities = map[string]string{} 29 + } 30 + f.identities[did] = handle 31 + return nil 32 + } 33 + 34 + func (f *fakeStore) UpsertDocument(_ context.Context, doc *store.Document) error { 35 + f.documents = append(f.documents, doc) 36 + return nil 20 37 } 21 38 22 39 type fakeFollowFetcher struct { ··· 67 84 return "", io.EOF 68 85 } 69 86 87 + type fakeProfileFetcher struct { 88 + profiles map[string]*ProfileRecord 89 + } 90 + 91 + func (f *fakeProfileFetcher) FetchProfile(_ context.Context, did string) (*ProfileRecord, error) { 92 + if pr, ok := f.profiles[did]; ok { 93 + return pr, nil 94 + } 95 + return &ProfileRecord{}, nil 96 + } 97 + 70 98 func TestRunner_DiscoveryAndSubmit(t *testing.T) { 71 99 st := &fakeStore{ 72 100 collaborators: map[string][]string{ ··· 77 105 tap := &fakeTapAdmin{statuses: map[string]RepoStatus{"did:plc:f1": {Found: true, Tracked: true, Backfilled: true}}} 78 106 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 79 107 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 80 - r := NewRunnerWithDeps(st, tap, resolver, follows, log) 108 + r := NewRunnerWithDeps(st, tap, resolver, follows, &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, log) 81 109 82 110 dir := t.TempDir() 83 111 seedsPath := filepath.Join(dir, "seeds.txt") ··· 109 137 tap := &fakeTapAdmin{statuses: map[string]RepoStatus{}} 110 138 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 111 139 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 112 - r := NewRunnerWithDeps(st, tap, resolver, follows, log) 140 + r := NewRunnerWithDeps(st, tap, resolver, follows, &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, log) 113 141 114 142 dir := t.TempDir() 115 143 seedsPath := filepath.Join(dir, "seeds.txt") ··· 140 168 }} 141 169 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 142 170 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 143 - r := NewRunnerWithDeps(st, tap, resolver, follows, log) 171 + r := NewRunnerWithDeps(st, tap, resolver, follows, &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, log) 144 172 145 173 dir := t.TempDir() 146 174 seedsPath := filepath.Join(dir, "seeds.txt") ··· 170 198 } 171 199 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 172 200 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 173 - r := NewRunnerWithDeps(st, tap, resolver, follows, log) 201 + r := NewRunnerWithDeps(st, tap, resolver, follows, &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, log) 174 202 175 203 dir := t.TempDir() 176 204 seedsPath := filepath.Join(dir, "seeds.txt") ··· 222 250 } 223 251 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 224 252 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 225 - r := NewRunnerWithDeps(st, tap, resolver, follows, log) 253 + r := NewRunnerWithDeps(st, tap, resolver, follows, &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, log) 226 254 227 255 dir := t.TempDir() 228 256 seedsPath := filepath.Join(dir, "seeds.txt") ··· 252 280 } 253 281 } 254 282 } 283 + 284 + func TestRunner_IndexesProfilesAndHandles(t *testing.T) { 285 + st := &fakeStore{collaborators: map[string][]string{}} 286 + follows := &fakeFollowFetcher{follows: map[string][]string{}} 287 + tap := &fakeTapAdmin{statuses: map[string]RepoStatus{}} 288 + resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 289 + profiles := &fakeProfileFetcher{profiles: map[string]*ProfileRecord{ 290 + "did:plc:seed": { 291 + Record: map[string]any{ 292 + "description": "Building cool stuff", 293 + "location": "NYC", 294 + }, 295 + CID: "bafyabc123", 296 + Handle: "alice.tangled.sh", 297 + }, 298 + }} 299 + log := slog.New(slog.NewTextHandler(io.Discard, nil)) 300 + r := NewRunnerWithDeps(st, tap, resolver, follows, profiles, log) 301 + 302 + dir := t.TempDir() 303 + seedsPath := filepath.Join(dir, "seeds.txt") 304 + if err := os.WriteFile(seedsPath, []byte("alice.tangled.sh\n"), 0o644); err != nil { 305 + t.Fatalf("write seeds: %v", err) 306 + } 307 + 308 + err := r.Run(context.Background(), Options{SeedsPath: seedsPath, MaxHops: 0}) 309 + if err != nil { 310 + t.Fatalf("run backfill: %v", err) 311 + } 312 + 313 + // Identity handle should be persisted. 314 + if st.identities["did:plc:seed"] != "alice.tangled.sh" { 315 + t.Fatalf("expected identity handle for seed DID, got %#v", st.identities) 316 + } 317 + 318 + // Profile document should be created. 319 + if len(st.documents) != 1 { 320 + t.Fatalf("expected 1 profile document, got %d", len(st.documents)) 321 + } 322 + doc := st.documents[0] 323 + if doc.Title != "alice.tangled.sh" { 324 + t.Errorf("expected title to be handle, got %q", doc.Title) 325 + } 326 + if doc.AuthorHandle != "alice.tangled.sh" { 327 + t.Errorf("expected author_handle to be handle, got %q", doc.AuthorHandle) 328 + } 329 + if doc.Body != "Building cool stuff" { 330 + t.Errorf("expected body to be description, got %q", doc.Body) 331 + } 332 + if doc.RecordType != "profile" { 333 + t.Errorf("expected record_type profile, got %q", doc.RecordType) 334 + } 335 + if !strings.Contains(doc.Summary, "NYC") { 336 + t.Errorf("expected summary to contain location, got %q", doc.Summary) 337 + } 338 + }
+146
packages/api/internal/backfill/profile.go
··· 1 + package backfill 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "net/http" 8 + "net/url" 9 + "strings" 10 + "time" 11 + ) 12 + 13 + const profileCollection = "sh.tangled.actor.profile" 14 + 15 + // ProfileRecord holds the fetched profile data and resolved handle. 16 + type ProfileRecord struct { 17 + Record map[string]any 18 + CID string 19 + Handle string 20 + } 21 + 22 + type profileFetcher interface { 23 + FetchProfile(ctx context.Context, did string) (*ProfileRecord, error) 24 + } 25 + 26 + // HTTPProfileFetcher fetches sh.tangled.actor.profile records via XRPC 27 + // and resolves handles from the DID document. 28 + type HTTPProfileFetcher struct { 29 + client *http.Client 30 + } 31 + 32 + func NewHTTPProfileFetcher() *HTTPProfileFetcher { 33 + return &HTTPProfileFetcher{ 34 + client: &http.Client{Timeout: 15 * time.Second}, 35 + } 36 + } 37 + 38 + func (f *HTTPProfileFetcher) FetchProfile(ctx context.Context, did string) (*ProfileRecord, error) { 39 + pds, handle, err := f.resolveDIDDoc(ctx, did) 40 + if err != nil { 41 + return nil, fmt.Errorf("resolve did doc: %w", err) 42 + } 43 + 44 + u, err := url.Parse(strings.TrimSuffix(pds, "/") + "/xrpc/com.atproto.repo.getRecord") 45 + if err != nil { 46 + return nil, fmt.Errorf("build getRecord url: %w", err) 47 + } 48 + q := u.Query() 49 + q.Set("repo", did) 50 + q.Set("collection", profileCollection) 51 + q.Set("rkey", "self") 52 + u.RawQuery = q.Encode() 53 + 54 + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) 55 + if err != nil { 56 + return nil, fmt.Errorf("build getRecord request: %w", err) 57 + } 58 + 59 + resp, err := f.client.Do(req) 60 + if err != nil { 61 + return nil, fmt.Errorf("getRecord request: %w", err) 62 + } 63 + defer resp.Body.Close() 64 + 65 + if resp.StatusCode == http.StatusNotFound { 66 + // No profile record — return handle only so identity can still be stored. 67 + return &ProfileRecord{Handle: handle}, nil 68 + } 69 + if resp.StatusCode != http.StatusOK { 70 + return nil, fmt.Errorf("getRecord failed: status %d", resp.StatusCode) 71 + } 72 + 73 + var payload struct { 74 + CID string `json:"cid"` 75 + Value map[string]any `json:"value"` 76 + } 77 + if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil { 78 + return nil, fmt.Errorf("decode getRecord response: %w", err) 79 + } 80 + 81 + return &ProfileRecord{ 82 + Record: payload.Value, 83 + CID: payload.CID, 84 + Handle: handle, 85 + }, nil 86 + } 87 + 88 + // resolveDIDDoc fetches the DID document and returns (pdsEndpoint, handle, error). 89 + func (f *HTTPProfileFetcher) resolveDIDDoc(ctx context.Context, did string) (string, string, error) { 90 + var didDocURL string 91 + switch { 92 + case strings.HasPrefix(did, "did:plc:"): 93 + didDocURL = plcDirectoryBase + "/" + url.PathEscape(did) 94 + case strings.HasPrefix(did, "did:web:"): 95 + hostAndPath := strings.TrimPrefix(did, "did:web:") 96 + hostAndPath = strings.ReplaceAll(hostAndPath, ":", "/") 97 + didDocURL = "https://" + hostAndPath + "/.well-known/did.json" 98 + default: 99 + return "", "", fmt.Errorf("unsupported did type: %s", did) 100 + } 101 + 102 + req, err := http.NewRequestWithContext(ctx, http.MethodGet, didDocURL, nil) 103 + if err != nil { 104 + return "", "", fmt.Errorf("build did doc request: %w", err) 105 + } 106 + resp, err := f.client.Do(req) 107 + if err != nil { 108 + return "", "", fmt.Errorf("did doc request: %w", err) 109 + } 110 + defer resp.Body.Close() 111 + if resp.StatusCode != http.StatusOK { 112 + return "", "", fmt.Errorf("did doc lookup failed: status %d", resp.StatusCode) 113 + } 114 + 115 + var didDoc struct { 116 + AlsoKnownAs []string `json:"alsoKnownAs"` 117 + Service []struct { 118 + Type string `json:"type"` 119 + ServiceEndpoint string `json:"serviceEndpoint"` 120 + } `json:"service"` 121 + } 122 + if err := json.NewDecoder(resp.Body).Decode(&didDoc); err != nil { 123 + return "", "", fmt.Errorf("decode did doc: %w", err) 124 + } 125 + 126 + var pds string 127 + for _, svc := range didDoc.Service { 128 + if svc.Type == "AtprotoPersonalDataServer" && strings.TrimSpace(svc.ServiceEndpoint) != "" { 129 + pds = strings.TrimSpace(svc.ServiceEndpoint) 130 + break 131 + } 132 + } 133 + if pds == "" { 134 + return "", "", fmt.Errorf("no atproto pds endpoint in did document") 135 + } 136 + 137 + var handle string 138 + for _, aka := range didDoc.AlsoKnownAs { 139 + if strings.HasPrefix(aka, "at://") { 140 + handle = strings.TrimPrefix(aka, "at://") 141 + break 142 + } 143 + } 144 + 145 + return pds, handle, nil 146 + }