tangled vouch map with historical data
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: update backfill again to use repo query

+242 -58
+231 -58
cmd/backfill/main.go
··· 11 11 "net/http" 12 12 "os" 13 13 "os/signal" 14 + "strconv" 14 15 "strings" 15 16 "sync" 16 17 "sync/atomic" ··· 25 26 VouchCollection = "sh.tangled.graph.vouch" 26 27 FollowCollection = "sh.tangled.graph.follow" 27 28 KnotMemberCollection = "sh.tangled.knot.member" 29 + TangledProfileCollection = "sh.tangled.actor.profile" 28 30 DefaultKnotDID = "did:plc:wshs7t2adsemcrrd4snkeqli" 31 + ListReposByCollectionURL = "https://lightrail.microcosm.blue/xrpc/com.atproto.sync.listReposByCollection" 29 32 ListRecordsLimit = 100 30 - MaxWorkers = 20 33 + MaxWorkers = 5 31 34 ProfileBatchSize = 50 32 35 ProfileInterval = 30 * time.Second 33 - MaxRetries = 3 36 + MaxRetries = 5 34 37 RetryBaseDelay = 2 * time.Second 38 + RateLimitDelay = 10 * time.Second 35 39 ) 36 40 37 41 var httpClient = &http.Client{ ··· 69 73 func main() { 70 74 dbPath := flag.String("db", "tangle.db", "path to sqlite database") 71 75 workers := flag.Int("workers", MaxWorkers, "number of parallel workers") 72 - knotDID := flag.String("knot", DefaultKnotDID, "default knot DID to seed members from") 76 + knotDID := flag.String("knot", DefaultKnotDID, "fallback knot DID to seed members from") 73 77 flag.Parse() 74 78 args := flag.Args() 75 79 ··· 96 100 }() 97 101 98 102 seedDIDs := args 99 - if len(seedDIDs) == 0 { 100 - seedDIDs = bootstrapFromDB(store) 101 - } 102 103 103 - slog.Info("seeding from knot members", "knot", *knotDID) 104 - knotMembers, err := fetchKnotMembers(ctx, *knotDID) 104 + // primary: crawl all tangled DIDs via listReposByCollection 105 + slog.Info("fetching tangled user list from bsky.network") 106 + tangledDIDs, err := fetchTangledDIDs(ctx) 105 107 if err != nil { 106 - slog.Warn("failed to fetch knot members, continuing without", "error", err) 108 + slog.Warn("failed to fetch tangled DIDs from bsky.network", "error", err) 107 109 } else { 108 - slog.Info("found knot members", "count", len(knotMembers)) 109 - for _, km := range knotMembers { 110 - seedDIDs = append(seedDIDs, km.MemberDID) 111 - if err := store.UpsertKnotMember(km); err != nil { 112 - slog.Warn("failed to store knot member", "error", err) 110 + slog.Info("found tangled DIDs", "count", len(tangledDIDs)) 111 + seedDIDs = append(seedDIDs, tangledDIDs...) 112 + } 113 + 114 + // fallback: knot members 115 + if len(seedDIDs) == 0 { 116 + slog.Info("seeding from knot members", "knot", *knotDID) 117 + knotMembers, err := fetchKnotMembers(ctx, *knotDID) 118 + if err != nil { 119 + slog.Warn("failed to fetch knot members", "error", err) 120 + } else { 121 + slog.Info("found knot members", "count", len(knotMembers)) 122 + for _, km := range knotMembers { 123 + seedDIDs = append(seedDIDs, km.MemberDID) 124 + if err := store.UpsertKnotMember(km); err != nil { 125 + slog.Warn("failed to store knot member", "error", err) 126 + } 113 127 } 114 128 } 115 129 } 116 130 131 + // fallback: existing DB DIDs 117 132 if len(seedDIDs) == 0 { 118 - slog.Error("no seed DIDs; pass DIDs as arguments or ensure knot is reachable") 133 + seedDIDs = bootstrapFromDB(store) 134 + } 135 + 136 + if len(seedDIDs) == 0 { 137 + slog.Error("no seed DIDs; pass DIDs as arguments or ensure network is reachable") 119 138 os.Exit(1) 120 139 } 121 140 ··· 130 149 131 150 slog.Info("starting snowball backfill", "seeds", len(cleanSeeds), "workers", *workers) 132 151 133 - pdsCache := &sync.Map{} 152 + pdsCache := &pdsLimiter{urls: &sync.Map{}} 134 153 135 154 visited := sync.Map{} 136 155 didCh := make(chan string, 10000) ··· 296 315 return dids 297 316 } 298 317 318 + func fetchTangledDIDs(ctx context.Context) ([]string, error) { 319 + var dids []string 320 + cursor := "" 321 + 322 + for { 323 + select { 324 + case <-ctx.Done(): 325 + return dids, ctx.Err() 326 + default: 327 + } 328 + 329 + u := fmt.Sprintf("%s?collection=%s&limit=1000", ListReposByCollectionURL, TangledProfileCollection) 330 + if cursor != "" { 331 + u += "&cursor=" + cursor 332 + } 333 + 334 + req, err := http.NewRequestWithContext(ctx, "GET", u, nil) 335 + if err != nil { 336 + return dids, err 337 + } 338 + 339 + resp, err := httpClient.Do(req) 340 + if err != nil { 341 + return dids, err 342 + } 343 + 344 + if resp.StatusCode != 200 { 345 + resp.Body.Close() 346 + return dids, fmt.Errorf("listReposByCollection returned %d", resp.StatusCode) 347 + } 348 + 349 + var result struct { 350 + Cursor string `json:"cursor"` 351 + Repos []struct { 352 + DID string `json:"did"` 353 + } `json:"repos"` 354 + } 355 + err = json.NewDecoder(resp.Body).Decode(&result) 356 + resp.Body.Close() 357 + if err != nil { 358 + return dids, err 359 + } 360 + 361 + for _, r := range result.Repos { 362 + if isValidDID(r.DID) { 363 + dids = append(dids, r.DID) 364 + } 365 + } 366 + 367 + slog.Info("fetched tangled DIDs", "count", len(dids), "cursor", result.Cursor != "") 368 + 369 + if result.Cursor == "" || len(result.Repos) == 0 { 370 + break 371 + } 372 + cursor = result.Cursor 373 + } 374 + 375 + return dids, nil 376 + } 377 + 299 378 func fetchKnotMembers(ctx context.Context, knotDID string) ([]db.KnotMember, error) { 300 - pdsURL, err := cachedResolvePDS(ctx, nil, knotDID) 379 + pdsURL, err := resolvePDS(ctx, knotDID) 301 380 if err != nil { 302 381 return nil, fmt.Errorf("resolve PDS for knot: %w", err) 303 382 } ··· 350 429 return members, nil 351 430 } 352 431 353 - func backfillDID(ctx context.Context, store *db.Store, pdsCache *sync.Map, did string, vouchCount, followCount *atomic.Int64) ([]string, error) { 354 - pdsURL, err := cachedResolvePDS(ctx, pdsCache, did) 432 + func backfillDID(ctx context.Context, store *db.Store, limiter *pdsLimiter, did string, vouchCount, followCount *atomic.Int64) ([]string, error) { 433 + pdsURL, err := limiter.Get(ctx, did) 355 434 if err != nil { 356 435 return nil, fmt.Errorf("resolve PDS: %w", err) 357 436 } 358 437 438 + // wait if this PDS is rate-limited 439 + if err := limiter.WaitIfLimited(ctx, pdsURL); err != nil { 440 + return nil, err 441 + } 442 + 359 443 var newDIDs []string 360 444 361 - type result struct { 362 - dids []string 363 - count int 364 - err error 445 + // always fetch vouches 446 + vouchDIDs, vouchN, err := fetchVouches(ctx, store, limiter, pdsURL, did) 447 + if err != nil { 448 + if rle, ok := err.(*rateLimitError); ok { 449 + limiter.RateLimited(pdsURL, rle.retryAfter) 450 + } 451 + return newDIDs, fmt.Errorf("fetch vouches: %w", err) 365 452 } 366 - 367 - vouchCh := make(chan result, 1) 368 - followCh := make(chan result, 1) 453 + vouchCount.Add(int64(vouchN)) 454 + newDIDs = append(newDIDs, vouchDIDs...) 369 455 370 - go func() { 371 - dids, count, err := fetchVouches(ctx, store, pdsURL, did) 372 - vouchCh <- result{dids, count, err} 373 - }() 374 - 375 - go func() { 376 - dids, count, err := fetchFollows(ctx, store, pdsURL, did) 377 - followCh <- result{dids, count, err} 378 - }() 379 - 380 - vr := <-vouchCh 381 - if vr.err != nil { 382 - return newDIDs, fmt.Errorf("fetch vouches: %w", vr.err) 456 + // only fetch follows if this DID is on tangled 457 + onTangled, err := hasTangledProfile(ctx, pdsURL, did) 458 + if err != nil { 459 + slog.Warn("tangled profile check failed", "did", did, "error", err) 460 + onTangled = false 383 461 } 384 - vouchCount.Add(int64(vr.count)) 385 - newDIDs = append(newDIDs, vr.dids...) 386 462 387 - fr := <-followCh 388 - if fr.err != nil { 389 - slog.Warn("fetch follows failed", "did", did, "error", fr.err) 390 - } else { 391 - followCount.Add(int64(fr.count)) 392 - newDIDs = append(newDIDs, fr.dids...) 463 + if onTangled { 464 + // wait again in case vouch fetch hit a rate limit 465 + if err := limiter.WaitIfLimited(ctx, pdsURL); err != nil { 466 + return newDIDs, err 467 + } 468 + followDIDs, followN, err := fetchFollows(ctx, store, limiter, pdsURL, did) 469 + if err != nil { 470 + if rle, ok := err.(*rateLimitError); ok { 471 + limiter.RateLimited(pdsURL, rle.retryAfter) 472 + } 473 + slog.Warn("fetch follows failed", "did", did, "error", err) 474 + } else { 475 + followCount.Add(int64(followN)) 476 + newDIDs = append(newDIDs, followDIDs...) 477 + } 393 478 } 394 479 395 480 return newDIDs, nil 396 481 } 397 482 398 - func fetchVouches(ctx context.Context, store *db.Store, pdsURL, did string) ([]string, int, error) { 483 + func fetchVouches(ctx context.Context, store *db.Store, limiter *pdsLimiter, pdsURL, did string) ([]string, int, error) { 399 484 var newDIDs []string 400 485 var vouches []db.Vouch 401 486 cursor := "" 402 487 403 488 for { 489 + if err := limiter.WaitIfLimited(ctx, pdsURL); err != nil { 490 + return newDIDs, 0, err 491 + } 404 492 records, nextCursor, err := listRecordsWithRetry(ctx, pdsURL, did, VouchCollection, cursor) 405 493 if err != nil { 406 494 return newDIDs, 0, err ··· 451 539 return newDIDs, len(vouches), nil 452 540 } 453 541 454 - func fetchFollows(ctx context.Context, store *db.Store, pdsURL, did string) ([]string, int, error) { 542 + func fetchFollows(ctx context.Context, store *db.Store, limiter *pdsLimiter, pdsURL, did string) ([]string, int, error) { 455 543 var newDIDs []string 456 544 var follows []db.Follow 457 545 cursor := "" 458 546 459 547 for { 548 + if err := limiter.WaitIfLimited(ctx, pdsURL); err != nil { 549 + return newDIDs, 0, err 550 + } 460 551 records, nextCursor, err := listRecordsWithRetry(ctx, pdsURL, did, FollowCollection, cursor) 461 552 if err != nil { 462 553 return newDIDs, 0, err ··· 501 592 return newDIDs, len(follows), nil 502 593 } 503 594 595 + type rateLimitError struct { 596 + retryAfter time.Duration 597 + } 598 + 599 + func (e *rateLimitError) Error() string { 600 + return fmt.Sprintf("rate limited, retry after %s", e.retryAfter) 601 + } 602 + 504 603 type Record struct { 505 604 URI string 506 605 Value json.RawMessage ··· 508 607 509 608 func listRecordsWithRetry(ctx context.Context, pdsURL, did, collection, cursor string) ([]Record, string, error) { 510 609 var lastErr error 511 - for attempt := 0; attempt <= MaxRetries; attempt++ { 610 + for attempt := 0; attempt < MaxRetries; attempt++ { 512 611 if attempt > 0 { 513 612 delay := RetryBaseDelay * time.Duration(math.Pow(2, float64(attempt-1))) 613 + if rle, ok := lastErr.(*rateLimitError); ok { 614 + delay = rle.retryAfter 615 + } 616 + slog.Info("retrying", "collection", collection, "did", did, "attempt", attempt+1, "delay", delay) 514 617 select { 515 618 case <-ctx.Done(): 516 619 return nil, "", ctx.Err() ··· 522 625 return records, nextCursor, nil 523 626 } 524 627 lastErr = err 628 + // don't retry non-429 errors more than 3 times 629 + if _, ok := err.(*rateLimitError); !ok && attempt >= 2 { 630 + break 631 + } 525 632 } 526 633 return nil, "", lastErr 527 634 } ··· 544 651 } 545 652 defer resp.Body.Close() 546 653 654 + if resp.StatusCode == 429 { 655 + retryAfter := RateLimitDelay 656 + if ra := resp.Header.Get("Retry-After"); ra != "" { 657 + if secs, err := strconv.Atoi(ra); err == nil && secs > 0 { 658 + retryAfter = time.Duration(secs) * time.Second 659 + } 660 + } 661 + resp.Body.Close() 662 + return nil, "", &rateLimitError{retryAfter: retryAfter} 663 + } 664 + 547 665 if resp.StatusCode == 404 { 666 + resp.Body.Close() 548 667 return nil, "", nil 549 668 } 550 669 551 670 if resp.StatusCode != 200 { 552 671 body, _ := io.ReadAll(resp.Body) 672 + resp.Body.Close() 553 673 return nil, "", fmt.Errorf("listRecords %s returned %d: %s", collection, resp.StatusCode, truncate(string(body), 200)) 554 674 } 555 675 ··· 566 686 return records, result.Cursor, nil 567 687 } 568 688 569 - func cachedResolvePDS(ctx context.Context, cache *sync.Map, did string) (string, error) { 570 - if cache != nil { 571 - if v, ok := cache.Load(did); ok { 572 - return v.(string), nil 573 - } 689 + func hasTangledProfile(ctx context.Context, pdsURL, did string) (bool, error) { 690 + u := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=%s&limit=1", 691 + pdsURL, did, TangledProfileCollection) 692 + 693 + req, err := http.NewRequestWithContext(ctx, "GET", u, nil) 694 + if err != nil { 695 + return false, err 696 + } 697 + 698 + resp, err := httpClient.Do(req) 699 + if err != nil { 700 + return false, err 701 + } 702 + defer resp.Body.Close() 703 + 704 + if resp.StatusCode == 404 { 705 + return false, nil 706 + } 707 + if resp.StatusCode != 200 { 708 + return false, fmt.Errorf("tangled profile check returned %d", resp.StatusCode) 709 + } 710 + 711 + var result ListRecordsResponse 712 + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 713 + return false, err 714 + } 715 + 716 + return len(result.Records) > 0, nil 717 + } 718 + 719 + type pdsLimiter struct { 720 + urls *sync.Map // did -> pdsURL 721 + until sync.Map // pdsURL -> time.Time (rate-limited until) 722 + } 723 + 724 + func (p *pdsLimiter) Get(ctx context.Context, did string) (string, error) { 725 + if v, ok := p.urls.Load(did); ok { 726 + return v.(string), nil 574 727 } 575 728 576 729 pdsURL, err := resolvePDS(ctx, did) ··· 578 731 return "", err 579 732 } 580 733 581 - if cache != nil { 582 - cache.Store(did, pdsURL) 583 - } 734 + p.urls.Store(did, pdsURL) 584 735 return pdsURL, nil 736 + } 737 + 738 + func (p *pdsLimiter) RateLimited(pdsURL string, dur time.Duration) { 739 + p.until.Store(pdsURL, time.Now().Add(dur)) 740 + } 741 + 742 + func (p *pdsLimiter) WaitIfLimited(ctx context.Context, pdsURL string) error { 743 + if v, ok := p.until.Load(pdsURL); ok { 744 + deadline := v.(time.Time) 745 + if wait := time.Until(deadline); wait > 0 { 746 + select { 747 + case <-ctx.Done(): 748 + return ctx.Err() 749 + case <-time.After(wait): 750 + } 751 + } 752 + } 753 + return nil 754 + } 755 + 756 + func cachedResolvePDS(ctx context.Context, cache *pdsLimiter, did string) (string, error) { 757 + return cache.Get(ctx, did) 585 758 } 586 759 587 760 func resolvePDS(ctx context.Context, did string) (string, error) {
+9
cmd/ingest/main.go
··· 33 33 } 34 34 defer store.Close() 35 35 36 + total, vouches, denounces, follows, knot, _ := store.Stats() 37 + slog.Info("database loaded", 38 + "vouches", vouches, 39 + "denounces", denounces, 40 + "follows", follows, 41 + "knot_members", knot, 42 + "total_records", total, 43 + ) 44 + 36 45 ctx, cancel := context.WithCancel(context.Background()) 37 46 defer cancel() 38 47
+2
internal/resolve/resolve.go
··· 209 209 } 210 210 211 211 if p.Handle == "" && p.AvatarURL == "" { 212 + // tombstone: mark as unresolvable so we don't retry 213 + store.UpsertProfile(db.Profile{DID: did, Handle: "!", UpdatedAt: time.Now()}) 212 214 continue 213 215 } 214 216