this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Triage autoscaler tuning (#561)

authored by

Jaz and committed by
GitHub
3154037c 4ef89d60

+78 -42
+8
backfill/backfill.go
··· 49 49 UpdateRev(ctx context.Context, repo, rev string) error 50 50 51 51 EnqueueJob(ctx context.Context, repo string) error 52 + EnqueueJobWithState(ctx context.Context, repo string, state string) error 52 53 53 54 PurgeRepo(ctx context.Context, repo string) error 54 55 } ··· 486 487 }) 487 488 default: 488 489 return fmt.Errorf("invalid op action: %q", op.Action) 490 + } 491 + } 492 + 493 + if evt.Prev == nil { 494 + // The first event for a repo will have a nil prev, we can enqueue the repo as "complete" to avoid fetching the empty repo 495 + if err := bf.Store.EnqueueJobWithState(ctx, evt.Repo, StateComplete); err != nil { 496 + return fmt.Errorf("failed to enqueue job with state for repo %q: %w", evt.Repo, err) 489 497 } 490 498 } 491 499
+13
backfill/gormstore.go
··· 105 105 return nil 106 106 } 107 107 108 + func (s *Gormstore) EnqueueJobWithState(ctx context.Context, repo, state string) error { 109 + _, err := s.GetOrCreateJob(ctx, repo, state) 110 + if err != nil { 111 + return err 112 + } 113 + 114 + s.qlk.Lock() 115 + s.taskQueue = append(s.taskQueue, repo) 116 + s.qlk.Unlock() 117 + 118 + return nil 119 + } 120 + 108 121 func (s *Gormstore) createJobForRepo(repo, state string) error { 109 122 dbj := &GormDBJob{ 110 123 Repo: repo,
+18
backfill/memstore.go
··· 63 63 return nil 64 64 } 65 65 66 + func (s *Memstore) EnqueueJobWithState(repo, state string) error { 67 + s.lk.Lock() 68 + defer s.lk.Unlock() 69 + 70 + if _, ok := s.jobs[repo]; ok { 71 + return fmt.Errorf("job already exists for repo %s", repo) 72 + } 73 + 74 + j := &Memjob{ 75 + repo: repo, 76 + createdAt: time.Now(), 77 + updatedAt: time.Now(), 78 + state: state, 79 + } 80 + s.jobs[repo] = j 81 + return nil 82 + } 83 + 66 84 func (s *Memstore) BufferOp(ctx context.Context, repo string, since *string, rev, kind, path string, rec *[]byte, cid *cid.Cid) (bool, error) { 67 85 s.lk.Lock() 68 86
+2 -1
bgs/bgs.go
··· 107 107 EventsSent promclient.Counter 108 108 } 109 109 110 - func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, blobs blobs.BlobStore, rf *indexer.RepoFetcher, hr api.HandleResolver, ssl bool) (*BGS, error) { 110 + func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, blobs blobs.BlobStore, rf *indexer.RepoFetcher, hr api.HandleResolver, ssl bool, compactInterval time.Duration) (*BGS, error) { 111 111 db.AutoMigrate(User{}) 112 112 db.AutoMigrate(AuthToken{}) 113 113 db.AutoMigrate(models.PDS{}) ··· 146 146 } 147 147 148 148 compactor := NewCompactor(nil) 149 + compactor.requeueInterval = compactInterval 149 150 compactor.Start(bgs) 150 151 bgs.compactor = compactor 151 152
+22 -28
bgs/compactor.go
··· 186 186 func (c *Compactor) Start(bgs *BGS) { 187 187 log.Info("starting compactor") 188 188 go c.doWork(bgs) 189 - go func() { 190 - log.Infow("starting compactor requeue routine", 191 - "interval", c.requeueInterval, 192 - "limit", c.requeueLimit, 193 - "shardCount", c.requeueShardCount, 194 - "fast", c.requeueFast, 195 - ) 189 + if c.requeueInterval > 0 { 190 + go func() { 191 + log.Infow("starting compactor requeue routine", 192 + "interval", c.requeueInterval, 193 + "limit", c.requeueLimit, 194 + "shardCount", c.requeueShardCount, 195 + "fast", c.requeueFast, 196 + ) 196 197 197 - // Enqueue all repos on startup 198 - ctx := context.Background() 199 - ctx, span := otel.Tracer("compactor").Start(ctx, "RequeueRoutine") 200 - if err := c.EnqueueAllRepos(ctx, bgs, c.requeueLimit, c.requeueShardCount, c.requeueFast); err != nil { 201 - log.Errorw("failed to enqueue all repos", "err", err) 202 - } 203 - span.End() 204 - 205 - t := time.NewTicker(c.requeueInterval) 206 - for { 207 - select { 208 - case <-c.exit: 209 - return 210 - case <-t.C: 211 - ctx := context.Background() 212 - ctx, span := otel.Tracer("compactor").Start(ctx, "RequeueRoutine") 213 - if err := c.EnqueueAllRepos(ctx, bgs, c.requeueLimit, c.requeueShardCount, c.requeueFast); err != nil { 214 - log.Errorw("failed to enqueue all repos", "err", err) 198 + t := time.NewTicker(c.requeueInterval) 199 + for { 200 + select { 201 + case <-c.exit: 202 + return 203 + case <-t.C: 204 + ctx := context.Background() 205 + ctx, span := otel.Tracer("compactor").Start(ctx, "RequeueRoutine") 206 + if err := c.EnqueueAllRepos(ctx, bgs, c.requeueLimit, c.requeueShardCount, c.requeueFast); err != nil { 207 + log.Errorw("failed to enqueue all repos", "err", err) 208 + } 209 + span.End() 215 210 } 216 - span.End() 217 211 } 218 - } 219 - }() 212 + }() 213 + } 220 214 } 221 215 222 216 // Shutdown shuts down the compactor
+7 -11
bgs/fedmgr.go
··· 11 11 12 12 comatproto "github.com/bluesky-social/indigo/api/atproto" 13 13 "github.com/bluesky-social/indigo/events" 14 - "github.com/bluesky-social/indigo/events/schedulers/autoscaling" 14 + "github.com/bluesky-social/indigo/events/schedulers/parallel" 15 15 "github.com/bluesky-social/indigo/models" 16 16 "go.opentelemetry.io/otel" 17 17 "golang.org/x/time/rate" ··· 536 536 537 537 instrumentedRSC := events.NewInstrumentedRepoStreamCallbacks(limiter, rsc.EventHandler) 538 538 539 - scalingSettings := autoscaling.AutoscaleSettings{ 540 - Concurrency: 1, 541 - MaxConcurrency: 360, 542 - AutoscaleFrequency: time.Second, 543 - ThroughputBucketCount: 60, 544 - ThroughputBucketDuration: time.Second, 545 - MaximumBufferedItemsPerRepo: 100, 546 - } 547 - 548 - pool := autoscaling.NewScheduler(scalingSettings, con.RemoteAddr().String(), instrumentedRSC.EventHandler) 539 + pool := parallel.NewScheduler( 540 + 100, 541 + 1_000, 542 + con.RemoteAddr().String(), 543 + instrumentedRSC.EventHandler, 544 + ) 549 545 return events.HandleRepoStream(ctx, con, pool) 550 546 } 551 547
+7 -1
cmd/bigsky/main.go
··· 139 139 EnvVars: []string{"MAX_METADB_CONNECTIONS"}, 140 140 Value: 40, 141 141 }, 142 + &cli.DurationFlag{ 143 + Name: "compact-interval", 144 + EnvVars: []string{"BGS_COMPACT_INTERVAL"}, 145 + Value: 4 * time.Hour, 146 + Usage: "interval between compaction runs, set to 0 to disable scheduled compaction", 147 + }, 142 148 } 143 149 144 150 app.Action = Bigsky ··· 337 343 } 338 344 339 345 log.Infow("constructing bgs") 340 - bgs, err := libbgs.NewBGS(db, ix, repoman, evtman, cachedidr, blobstore, rf, hr, !cctx.Bool("crawl-insecure-ws")) 346 + bgs, err := libbgs.NewBGS(db, ix, repoman, evtman, cachedidr, blobstore, rf, hr, !cctx.Bool("crawl-insecure-ws"), cctx.Duration("compact-interval")) 341 347 if err != nil { 342 348 return err 343 349 }
+1 -1
testing/utils.go
··· 452 452 453 453 tr := &api.TestHandleResolver{} 454 454 455 - b, err := bgs.NewBGS(maindb, ix, repoman, evtman, didr, nil, rf, tr, false) 455 + b, err := bgs.NewBGS(maindb, ix, repoman, evtman, didr, nil, rf, tr, false, time.Hour*4) 456 456 if err != nil { 457 457 return nil, err 458 458 }