this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Get repos from the BGS in Search (#335)

authored by

Jaz and committed by
GitHub
9ac5e8ad a2219fc5

+206 -21
+5
atproto/identity/base_directory.go
··· 7 7 "net/http" 8 8 9 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 + "golang.org/x/time/rate" 10 11 ) 11 12 12 13 // The zero value ('BaseDirectory{}') is a usable Directory. 13 14 type BaseDirectory struct { 14 15 // if non-empty, this string should have URL method, hostname, and optional port; it should not have a path or trailing slash 15 16 PLCURL string 17 + // If not nil, this limiter will be used to rate-limit requests to the PLCURL 18 + PLCLimiter *rate.Limiter 19 + // If not nil, this function will be called inline with DID Web lookups, and can be used to limit the number of requests to a given hostname 20 + DIDWebLimitFunc func(ctx context.Context, hostname string) error 16 21 // HTTP client used for did:web, did:plc, and HTTP (well-known) handle resolution 17 22 HTTPClient http.Client 18 23 // DNS resolver used for DNS handle resolution. Calling code can use a custom Dialer to query against a specific DNS server, or re-implement the interface for even more control over the resolution process
+30
atproto/identity/cache_directory.go
··· 6 6 "time" 7 7 8 8 "github.com/bluesky-social/indigo/atproto/syntax" 9 + "github.com/prometheus/client_golang/prometheus" 10 + "github.com/prometheus/client_golang/prometheus/promauto" 9 11 10 12 "github.com/hashicorp/golang-lru/v2/expirable" 11 13 ) ··· 29 31 Err error 30 32 } 31 33 34 + var handleCacheHits = promauto.NewCounter(prometheus.CounterOpts{ 35 + Name: "atproto_directory_handle_cache_hits", 36 + Help: "Number of cache hits for ATProto handle lookups", 37 + }) 38 + 39 + var handleCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ 40 + Name: "atproto_directory_handle_cache_misses", 41 + Help: "Number of cache misses for ATProto handle lookups", 42 + }) 43 + 44 + var identityCacheHits = promauto.NewCounter(prometheus.CounterOpts{ 45 + Name: "atproto_directory_identity_cache_hits", 46 + Help: "Number of cache hits for ATProto identity lookups", 47 + }) 48 + 49 + var identityCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ 50 + Name: "atproto_directory_identity_cache_misses", 51 + Help: "Number of cache misses for ATProto identity lookups", 52 + }) 53 + 32 54 var _ Directory = (*CacheDirectory)(nil) 33 55 34 56 // Capacity of zero means unlimited size. Similarly, ttl of zero means unlimited duration. ··· 89 111 maybeEntry, ok := d.handleCache.Get(h) 90 112 91 113 if !ok { 114 + handleCacheMisses.Inc() 92 115 entry, err = d.updateHandle(ctx, h) 93 116 if err != nil { 94 117 return "", err ··· 97 120 entry = &maybeEntry 98 121 } 99 122 if d.IsHandleStale(entry) { 123 + handleCacheMisses.Inc() 100 124 entry, err = d.updateHandle(ctx, h) 101 125 if err != nil { 102 126 return "", err 103 127 } 128 + } else { 129 + handleCacheHits.Inc() 104 130 } 105 131 return entry.DID, entry.Err 106 132 } ··· 136 162 maybeEntry, ok := d.identityCache.Get(did) 137 163 138 164 if !ok { 165 + identityCacheMisses.Inc() 139 166 entry, err = d.updateDID(ctx, did) 140 167 if err != nil { 141 168 return nil, err ··· 144 171 entry = &maybeEntry 145 172 } 146 173 if d.IsIdentityStale(entry) { 174 + identityCacheMisses.Inc() 147 175 entry, err = d.updateDID(ctx, did) 148 176 if err != nil { 149 177 return nil, err 150 178 } 179 + } else { 180 + identityCacheHits.Inc() 151 181 } 152 182 return entry.Identity, entry.Err 153 183 }
+14
atproto/identity/did.go
··· 67 67 68 68 // TODO: use a more robust client 69 69 // TODO: allow ctx to specify unsafe http:// resolution, for testing? 70 + 71 + if d.DIDWebLimitFunc != nil { 72 + if err := d.DIDWebLimitFunc(ctx, hostname); err != nil { 73 + return nil, fmt.Errorf("did:web limit func returned an error for (%s): %w", hostname, err) 74 + } 75 + } 76 + 70 77 resp, err := http.Get("https://" + hostname + "/.well-known/did.json") 71 78 // look for NXDOMAIN 72 79 var dnsErr *net.DNSError ··· 101 108 if plcURL == "" { 102 109 plcURL = DefaultPLCURL 103 110 } 111 + 112 + if d.PLCLimiter != nil { 113 + if err := d.PLCLimiter.Wait(ctx); err != nil { 114 + return nil, fmt.Errorf("failed to wait for PLC limiter: %w", err) 115 + } 116 + } 117 + 104 118 resp, err := http.Get(plcURL + "/" + did.String()) 105 119 if err != nil { 106 120 return nil, fmt.Errorf("failed did:plc directory resolution: %w", err)
+2 -2
backfill/backfill.go
··· 101 101 ParallelRecordCreates: 100, 102 102 NSIDFilter: "", 103 103 SyncRequestsPerSecond: 2, 104 - CheckoutPath: "https://bsky.social/xrpc/com.atproto.sync.getCheckout", 104 + CheckoutPath: "https://bsky.social/xrpc/com.atproto.sync.getRepo", 105 105 } 106 106 } 107 107 ··· 261 261 // GET and CAR decode the body 262 262 client := &http.Client{ 263 263 Transport: otelhttp.NewTransport(http.DefaultTransport), 264 - Timeout: 120 * time.Second, 264 + Timeout: 600 * time.Second, 265 265 } 266 266 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 267 267 if err != nil {
+26 -18
backfill/gormstore.go
··· 45 45 } 46 46 47 47 func (s *Gormstore) LoadJobs(ctx context.Context) error { 48 - // Load all jobs from the database 49 - var dbjobs []*GormDBJob 50 - if err := s.db.Find(&dbjobs).Error; err != nil { 51 - return err 52 - } 53 - 48 + limit := 20_000 49 + offset := 0 54 50 s.lk.Lock() 55 51 defer s.lk.Unlock() 56 52 57 - // Convert them to in-memory jobs 58 - for i := range dbjobs { 59 - dbj := dbjobs[i] 60 - j := &Gormjob{ 61 - repo: dbj.Repo, 62 - state: dbj.State, 63 - bufferedOps: map[string][]*bufferedOp{}, 64 - createdAt: dbj.CreatedAt, 65 - updatedAt: dbj.UpdatedAt, 53 + for { 54 + var dbjobs []*GormDBJob 55 + // Load all jobs from the database 56 + if err := s.db.Limit(limit).Offset(offset).Find(&dbjobs).Error; err != nil { 57 + return err 58 + } 59 + if len(dbjobs) == 0 { 60 + break 61 + } 62 + offset += len(dbjobs) 66 63 67 - dbj: dbj, 68 - db: s.db, 64 + // Convert them to in-memory jobs 65 + for i := range dbjobs { 66 + dbj := dbjobs[i] 67 + j := &Gormjob{ 68 + repo: dbj.Repo, 69 + state: dbj.State, 70 + bufferedOps: map[string][]*bufferedOp{}, 71 + createdAt: dbj.CreatedAt, 72 + updatedAt: dbj.UpdatedAt, 73 + 74 + dbj: dbj, 75 + db: s.db, 76 + } 77 + s.jobs[dbj.Repo] = j 69 78 } 70 - s.jobs[dbj.Repo] = j 71 79 } 72 80 73 81 return nil
+9 -1
cmd/palomar/main.go
··· 11 11 "time" 12 12 13 13 _ "github.com/joho/godotenv/autoload" 14 + "golang.org/x/time/rate" 14 15 15 16 "github.com/bluesky-social/indigo/atproto/identity" 16 17 "github.com/bluesky-social/indigo/search" ··· 132 133 Value: 20, 133 134 EnvVars: []string{"PALOMAR_INDEX_MAX_CONCURRENCY"}, 134 135 }, 136 + &cli.IntFlag{ 137 + Name: "plc-rate-limit", 138 + Usage: "max number of requests per second to PLC registry", 139 + Value: 100, 140 + EnvVars: []string{"PALOMAR_PLC_RATE_LIMIT"}, 141 + }, 135 142 }, 136 143 Action: func(cctx *cli.Context) error { 137 144 logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ ··· 155 162 HTTPClient: http.Client{ 156 163 Timeout: time.Second * 15, 157 164 }, 165 + PLCLimiter: rate.NewLimiter(rate.Limit(cctx.Int("plc-rate-limit")), 1), 158 166 TryAuthoritativeDNS: true, 159 167 SkipDNSDomainSuffixes: []string{".bsky.social"}, 160 168 } 161 - dir := identity.NewCacheDirectory(&base, 200000, time.Hour*24, time.Minute*2) 169 + dir := identity.NewCacheDirectory(&base, 1_500_000, time.Hour*24, time.Minute*2) 162 170 163 171 srv, err := search.NewServer( 164 172 db,
+65
search/firehose.go
··· 7 7 "net/http" 8 8 "net/url" 9 9 "strings" 10 + "time" 10 11 11 12 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 13 bsky "github.com/bluesky-social/indigo/api/bsky" ··· 51 52 return fmt.Errorf("loading backfill jobs: %w", err) 52 53 } 53 54 go s.bf.Start() 55 + go s.discoverRepos() 54 56 55 57 d := websocket.DefaultDialer 56 58 u, err := url.Parse(s.bgshost) ··· 92 94 return nil 93 95 } 94 96 97 + // Check if we've backfilled this repo, if not, we should enqueue it 98 + job, err := s.bfs.GetJob(ctx, evt.Repo) 99 + if job == nil && err == nil { 100 + logEvt.Info("enqueueing backfill job for new repo") 101 + if err := s.bfs.EnqueueJob(evt.Repo); err != nil { 102 + logEvt.Warn("failed to enqueue backfill job", "err", err) 103 + } 104 + } 105 + 95 106 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 96 107 if err != nil { 97 108 // TODO: handle this case (instead of return nil) ··· 157 168 rsc.EventHandler, 158 169 ), 159 170 ) 171 + } 172 + 173 + func (s *Server) discoverRepos() { 174 + ctx := context.Background() 175 + log := s.logger.With("func", "discoverRepos") 176 + log.Info("starting repo discovery") 177 + 178 + cursor := "" 179 + limit := int64(500) 180 + 181 + totalEnqueued := 0 182 + totalSkipped := 0 183 + totalErrored := 0 184 + 185 + for { 186 + resp, err := comatproto.SyncListRepos(ctx, s.bgsxrpc, cursor, limit) 187 + if err != nil { 188 + log.Error("failed to list repos", "err", err) 189 + time.Sleep(5 * time.Second) 190 + continue 191 + } 192 + log.Info("got repo page", "count", len(resp.Repos), "cursor", resp.Cursor) 193 + enqueued := 0 194 + skipped := 0 195 + errored := 0 196 + for _, repo := range resp.Repos { 197 + job, err := s.bfs.GetJob(ctx, repo.Did) 198 + if job == nil && err == nil { 199 + log.Info("enqueuing backfill job for new repo", "did", repo.Did) 200 + if err := s.bfs.EnqueueJob(repo.Did); err != nil { 201 + log.Warn("failed to enqueue backfill job", "err", err) 202 + errored++ 203 + continue 204 + } 205 + enqueued++ 206 + } else if err != nil { 207 + log.Warn("failed to get backfill job", "did", repo.Did, "err", err) 208 + errored++ 209 + } else { 210 + skipped++ 211 + } 212 + } 213 + log.Info("enqueued repos", "enqueued", enqueued, "skipped", skipped, "errored", errored) 214 + totalEnqueued += enqueued 215 + totalSkipped += skipped 216 + totalErrored += errored 217 + if resp.Cursor != nil && *resp.Cursor != "" { 218 + cursor = *resp.Cursor 219 + } else { 220 + break 221 + } 222 + } 223 + 224 + log.Info("finished repo discovery", "totalEnqueued", totalEnqueued, "totalSkipped", totalSkipped, "totalErrored", totalErrored) 160 225 } 161 226 162 227 func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path string, recP *typegen.CBORMarshaler, rcid *cid.Cid) error {
+53
search/handlers.go
··· 123 123 return e.JSON(200, out) 124 124 } 125 125 126 + type IndexError struct { 127 + DID string `json:"did"` 128 + Err string `json:"err"` 129 + } 130 + 131 + func (s *Server) handleIndexRepos(e echo.Context) error { 132 + ctx, span := otel.Tracer("search").Start(e.Request().Context(), "handleIndexRepos") 133 + defer span.End() 134 + 135 + dids, ok := e.QueryParams()["did"] 136 + if !ok { 137 + return e.JSON(400, map[string]any{ 138 + "error": "must pass at least one did to index", 139 + }) 140 + } 141 + 142 + for _, did := range dids { 143 + _, err := syntax.ParseDID(did) 144 + if err != nil { 145 + return e.JSON(400, map[string]any{ 146 + "error": fmt.Sprintf("invalid DID (%s): %s", did, err), 147 + }) 148 + } 149 + } 150 + 151 + errs := []IndexError{} 152 + successes := 0 153 + skipped := 0 154 + for _, did := range dids { 155 + job, err := s.bfs.GetJob(ctx, did) 156 + if job == nil && err == nil { 157 + err := s.bfs.EnqueueJob(did) 158 + if err != nil { 159 + errs = append(errs, IndexError{ 160 + DID: did, 161 + Err: err.Error(), 162 + }) 163 + continue 164 + } 165 + successes++ 166 + continue 167 + } 168 + skipped++ 169 + } 170 + 171 + return e.JSON(200, map[string]any{ 172 + "numEnqueued": successes, 173 + "numSkipped": skipped, 174 + "numErrored": len(errs), 175 + "errors": errs, 176 + }) 177 + } 178 + 126 179 func (s *Server) SearchPosts(ctx context.Context, q string, offset, size int) (*SearchPostsSkeletonResp, error) { 127 180 resp, err := DoSearchPosts(ctx, s.dir, s.escli, s.postIndex, q, offset, size) 128 181 if err != nil {
+2
search/server.go
··· 93 93 } else { 94 94 opts.SyncRequestsPerSecond = 8 95 95 } 96 + opts.CheckoutPath = fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo", bgshttp) 96 97 if config.IndexMaxConcurrency > 0 { 97 98 opts.ParallelRecordCreates = config.IndexMaxConcurrency 98 99 } else { ··· 199 200 e.GET("/metrics", echoprometheus.NewHandler()) 200 201 e.GET("/xrpc/app.bsky.unspecced.searchPostsSkeleton", s.handleSearchPostsSkeleton) 201 202 e.GET("/xrpc/app.bsky.unspecced.searchActorsSkeleton", s.handleSearchActorsSkeleton) 203 + e.GET("/xrpc/app.bsky.unspecced.indexRepos", s.handleIndexRepos) 202 204 s.echo = e 203 205 204 206 s.logger.Info("starting search API daemon", "bind", listen)