this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add Autoscaling Scheduler and refactor Events Worker Schedulers (#265)

# Events Worker Scheduler Refactor

Events worker schedulers have been refactored into the
`events/schedulers/{type}` packages.

The following schedulers are available:
- `sequential`: A standard single-worker scheduler that processes all
events in order
- `parallel`: A parallel event worker pool that runs a fixed number of
workers in their own goroutines, jobs for the same repo being run by the
same worker if they come in closely grouped batches.
- `autoscaling`: An autoscaling event worker pool that ramps up and down
the number of goroutines executing in parallel proportionally to event
throughput and tuned by configurable parameters.

All three schedulers now make use of a common set of Prometheus metrics
from the `schedulers` package, so dashboards and alarms can be re-used
between the different scheduling strategies.

# Autoscaling Events Scheduler

An autoscaling scheduler will start a configured number of workers
(default=1) and then scale up to a maximum configured number of workers.

The scheduler uses a Throughput Manager which contains a circular buffer
to keep track of a rolling window of event throughput.
- By default, the manager keeps track of the # of events per second over
the past 60 seconds and can compute the average throughput for the past
minute on demand
- The caller can configure how many buckets (default 60) are used for
computing the average throughput, as well as how frequently we rotate
those buckets (default to every second).
- The caller can also configure how frequently we poll the Manager for
the average throughput, allowing us to make scaling decisions with
higher reactivity, scaling up and down faster in response to large
changes in event throughput.
- By default, we poll the Throughput Manager once every 5 seconds to
make scaling decisions.

This allows for dynamic load-based scaling for PDS slurpers so we can
scale up to high throughout on connections that require it without
having to allocate tons of goroutines to every PDS connection.

We scale up if # of avg evts/sec > current concurrency and scale down if
the average throughput is < current concurrency - 1.

Locally tested with a bunch of debug logging for both scaling up and
down:

![image](https://github.com/bluesky-social/indigo/assets/1617325/60644dcd-21bf-4771-ae6a-9beb69d38b3b)

![image](https://github.com/bluesky-social/indigo/assets/1617325/a8f33e9a-6593-40a4-82fe-f2f5ad4fbb33)

authored by

Jaz and committed by
GitHub
5e72cf75 4584d18c

+607 -137
+10
bgs/bgs.go
··· 492 492 } 493 493 }() 494 494 495 + conn.SetPingHandler(func(message string) error { 496 + err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second*60)) 497 + if err == websocket.ErrCloseSent { 498 + return nil 499 + } else if e, ok := err.(net.Error); ok && e.Temporary() { 500 + return nil 501 + } 502 + return err 503 + }) 504 + 495 505 ident := c.RealIP() + "-" + c.Request().UserAgent() 496 506 497 507 evts, cleanup, err := bgs.events.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { return true }, since)
+10 -1
bgs/fedmgr.go
··· 10 10 11 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 12 "github.com/bluesky-social/indigo/events" 13 + "github.com/bluesky-social/indigo/events/schedulers/autoscaling" 13 14 "github.com/bluesky-social/indigo/models" 14 15 "go.opentelemetry.io/otel" 15 16 ··· 388 389 }, 389 390 } 390 391 391 - pool := events.NewConsumerPool(32, 20, con.RemoteAddr().String(), rsc.EventHandler) 392 + scalingSettings := autoscaling.AutoscaleSettings{ 393 + Concurrency: 1, 394 + MaxConcurrency: 360, 395 + AutoscaleFrequency: time.Second, 396 + ThroughputBucketCount: 60, 397 + ThroughputBucketDuration: time.Second, 398 + } 399 + 400 + pool := autoscaling.NewScheduler(scalingSettings, con.RemoteAddr().String(), rsc.EventHandler) 392 401 return events.HandleRepoStream(ctx, con, pool) 393 402 } 394 403
+7 -3
cmd/gosky/debug.go
··· 18 18 "github.com/bluesky-social/indigo/api/bsky" 19 19 "github.com/bluesky-social/indigo/did" 20 20 "github.com/bluesky-social/indigo/events" 21 + "github.com/bluesky-social/indigo/events/schedulers/sequential" 21 22 lexutil "github.com/bluesky-social/indigo/lex/util" 22 23 "github.com/bluesky-social/indigo/repo" 23 24 "github.com/bluesky-social/indigo/repomgr" ··· 97 98 }, 98 99 } 99 100 100 - err = events.HandleRepoStream(ctx, con, &events.SequentialScheduler{rsc.EventHandler}) 101 + seqScheduler := sequential.NewScheduler("debug-inspect-event", rsc.EventHandler) 102 + err = events.HandleRepoStream(ctx, con, seqScheduler) 101 103 if err != errFoundIt { 102 104 return err 103 105 } ··· 251 253 return fmt.Errorf("%s: %s", evt.Error, evt.Message) 252 254 }, 253 255 } 254 - err = events.HandleRepoStream(ctx, con, &events.SequentialScheduler{rsc.EventHandler}) 256 + seqScheduler := sequential.NewScheduler("debug-stream", rsc.EventHandler) 257 + err = events.HandleRepoStream(ctx, con, seqScheduler) 255 258 if err != nil { 256 259 return err 257 260 } ··· 371 374 return fmt.Errorf("%s: %s", evt.Error, evt.Message) 372 375 }, 373 376 } 374 - if err := events.HandleRepoStream(ctx, con, &events.SequentialScheduler{rsc.EventHandler}); err != nil { 377 + seqScheduler := sequential.NewScheduler(fmt.Sprintf("debug-stream-%d", i+1), rsc.EventHandler) 378 + if err := events.HandleRepoStream(ctx, con, seqScheduler); err != nil { 375 379 log.Fatalf("HandleRepoStream failure on url%d: %s", i+1, err) 376 380 } 377 381 }(i, url)
+3 -1
cmd/gosky/main.go
··· 20 20 "github.com/bluesky-social/indigo/api/bsky" 21 21 appbsky "github.com/bluesky-social/indigo/api/bsky" 22 22 "github.com/bluesky-social/indigo/events" 23 + "github.com/bluesky-social/indigo/events/schedulers/sequential" 23 24 lexutil "github.com/bluesky-social/indigo/lex/util" 24 25 "github.com/bluesky-social/indigo/repo" 25 26 "github.com/bluesky-social/indigo/util" ··· 1098 1099 return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message) 1099 1100 }, 1100 1101 } 1101 - return events.HandleRepoStream(ctx, con, &events.SequentialScheduler{rsc.EventHandler}) 1102 + seqScheduler := sequential.NewScheduler(con.RemoteAddr().String(), rsc.EventHandler) 1103 + return events.HandleRepoStream(ctx, con, seqScheduler) 1102 1104 }, 1103 1105 } 1104 1106
+7 -3
cmd/gosky/streamdiff.go
··· 7 7 8 8 comatproto "github.com/bluesky-social/indigo/api/atproto" 9 9 "github.com/bluesky-social/indigo/events" 10 + "github.com/bluesky-social/indigo/events/schedulers/sequential" 10 11 "github.com/gorilla/websocket" 11 12 cli "github.com/urfave/cli/v2" 12 13 ) ··· 55 56 return fmt.Errorf("%s: %s", evt.Error, evt.Message) 56 57 }, 57 58 } 58 - err = events.HandleRepoStream(ctx, cona, &events.SequentialScheduler{rsc.EventHandler}) 59 + seqScheduler := sequential.NewScheduler("streamA", rsc.EventHandler) 60 + err = events.HandleRepoStream(ctx, cona, seqScheduler) 59 61 if err != nil { 60 62 log.Errorf("stream A failed: %s", err) 61 63 } ··· 77 79 return fmt.Errorf("%s: %s", evt.Error, evt.Message) 78 80 }, 79 81 } 80 - err = events.HandleRepoStream(ctx, conb, &events.SequentialScheduler{rsc.EventHandler}) 82 + 83 + seqScheduler := sequential.NewScheduler("streamB", rsc.EventHandler) 84 + err = events.HandleRepoStream(ctx, conb, seqScheduler) 81 85 if err != nil { 82 - log.Errorf("stream A failed: %s", err) 86 + log.Errorf("stream B failed: %s", err) 83 87 } 84 88 }() 85 89
+6 -1
cmd/sonar/main.go
··· 13 13 "time" 14 14 15 15 "github.com/bluesky-social/indigo/events" 16 + "github.com/bluesky-social/indigo/events/schedulers/autoscaling" 16 17 "github.com/bluesky-social/indigo/sonar" 17 18 "github.com/bluesky-social/indigo/util/version" 18 19 "github.com/gorilla/websocket" ··· 108 109 109 110 wg := sync.WaitGroup{} 110 111 111 - pool := events.NewConsumerPool(cctx.Int("worker-count"), cctx.Int("max-queue-size"), u.Host, s.HandleStreamEvent) 112 + scalingSettings := autoscaling.DefaultAutoscaleSettings() 113 + scalingSettings.MaxConcurrency = cctx.Int("worker-count") 114 + scalingSettings.AutoscaleFrequency = time.Second 115 + 116 + pool := autoscaling.NewScheduler(scalingSettings, u.Host, s.HandleStreamEvent) 112 117 113 118 // Start a goroutine to manage the cursor file, saving the current cursor every 5 seconds. 114 119 go func() {
+12
events/consumer.go
··· 4 4 "context" 5 5 "fmt" 6 6 "io" 7 + "net" 7 8 "time" 8 9 9 10 comatproto "github.com/bluesky-social/indigo/api/atproto" ··· 62 63 func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler) error { 63 64 ctx, cancel := context.WithCancel(ctx) 64 65 defer cancel() 66 + defer sched.Shutdown() 65 67 66 68 remoteAddr := con.RemoteAddr().String() 67 69 ··· 81 83 } 82 84 } 83 85 }() 86 + 87 + con.SetPingHandler(func(message string) error { 88 + err := con.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second*60)) 89 + if err == websocket.ErrCloseSent { 90 + return nil 91 + } else if e, ok := err.(net.Error); ok && e.Temporary() { 92 + return nil 93 + } 94 + return err 95 + }) 84 96 85 97 lastSeq := int64(-1) 86 98 for {
+5
events/events.go
··· 17 17 18 18 var log = logging.Logger("events") 19 19 20 + type Scheduler interface { 21 + AddWork(ctx context.Context, repo string, val *XRPCStreamEvent) error 22 + Shutdown() 23 + } 24 + 20 25 type EventManager struct { 21 26 subs []*Subscriber 22 27 subsLk sync.Mutex
-15
events/metrics.go
··· 15 15 Help: "Total bytes received from the stream", 16 16 }, []string{"remote_addr"}) 17 17 18 - var workItemsAdded = promauto.NewCounterVec(prometheus.CounterOpts{ 19 - Name: "indigo_work_items_added_total", 20 - Help: "Total number of work items added to the consumer pool", 21 - }, []string{"pool"}) 22 - 23 - var workItemsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{ 24 - Name: "indigo_work_items_processed_total", 25 - Help: "Total number of work items processed by the consumer pool", 26 - }, []string{"pool"}) 27 - 28 - var workItemsActive = promauto.NewCounterVec(prometheus.CounterOpts{ 29 - Name: "indigo_work_items_active_total", 30 - Help: "Total number of work items passed into a worker", 31 - }, []string{"pool"}) 32 - 33 18 var eventsEnqueued = promauto.NewCounterVec(prometheus.CounterOpts{ 34 19 Name: "indigo_events_enqueued_for_broadcast_total", 35 20 Help: "Total number of events enqueued to broadcast to subscribers",
-110
events/parallel.go
··· 1 - package events 2 - 3 - import ( 4 - "context" 5 - "sync" 6 - ) 7 - 8 - type Scheduler interface { 9 - AddWork(ctx context.Context, repo string, val *XRPCStreamEvent) error 10 - } 11 - 12 - type SequentialScheduler struct { 13 - Do func(context.Context, *XRPCStreamEvent) error 14 - } 15 - 16 - func (s *SequentialScheduler) AddWork(ctx context.Context, repo string, val *XRPCStreamEvent) error { 17 - return s.Do(ctx, val) 18 - } 19 - 20 - type ParallelConsumerPool struct { 21 - maxConcurrency int 22 - maxQueue int 23 - 24 - do func(context.Context, *XRPCStreamEvent) error 25 - 26 - feeder chan *consumerTask 27 - 28 - lk sync.Mutex 29 - active map[string][]*consumerTask 30 - 31 - ident string 32 - } 33 - 34 - func NewConsumerPool(maxC, maxQ int, ident string, do func(context.Context, *XRPCStreamEvent) error) *ParallelConsumerPool { 35 - p := &ParallelConsumerPool{ 36 - maxConcurrency: maxC, 37 - maxQueue: maxQ, 38 - 39 - do: do, 40 - 41 - feeder: make(chan *consumerTask), 42 - active: make(map[string][]*consumerTask), 43 - 44 - ident: ident, 45 - } 46 - 47 - for i := 0; i < maxC; i++ { 48 - go p.worker() 49 - } 50 - 51 - return p 52 - } 53 - 54 - type consumerTask struct { 55 - repo string 56 - val *XRPCStreamEvent 57 - } 58 - 59 - func (p *ParallelConsumerPool) AddWork(ctx context.Context, repo string, val *XRPCStreamEvent) error { 60 - workItemsAdded.WithLabelValues(p.ident).Inc() 61 - t := &consumerTask{ 62 - repo: repo, 63 - val: val, 64 - } 65 - p.lk.Lock() 66 - 67 - a, ok := p.active[repo] 68 - if ok { 69 - p.active[repo] = append(a, t) 70 - p.lk.Unlock() 71 - return nil 72 - } 73 - 74 - p.active[repo] = []*consumerTask{} 75 - p.lk.Unlock() 76 - 77 - select { 78 - case p.feeder <- t: 79 - return nil 80 - case <-ctx.Done(): 81 - return ctx.Err() 82 - } 83 - } 84 - 85 - func (p *ParallelConsumerPool) worker() { 86 - for work := range p.feeder { 87 - for work != nil { 88 - workItemsActive.WithLabelValues(p.ident).Inc() 89 - if err := p.do(context.TODO(), work.val); err != nil { 90 - log.Errorf("event handler failed: %s", err) 91 - } 92 - workItemsProcessed.WithLabelValues(p.ident).Inc() 93 - 94 - p.lk.Lock() 95 - rem, ok := p.active[work.repo] 96 - if !ok { 97 - log.Errorf("should always have an 'active' entry if a worker is processing a job") 98 - } 99 - 100 - if len(rem) == 0 { 101 - delete(p.active, work.repo) 102 - work = nil 103 - } else { 104 - work = rem[0] 105 - p.active[work.repo] = rem[1:] 106 - } 107 - p.lk.Unlock() 108 - } 109 - } 110 - }
+229
events/schedulers/autoscaling/autoscaling.go
··· 1 + package autoscaling 2 + 3 + import ( 4 + "context" 5 + "sync" 6 + "time" 7 + 8 + "github.com/bluesky-social/indigo/events" 9 + "github.com/bluesky-social/indigo/events/schedulers" 10 + logging "github.com/ipfs/go-log" 11 + "github.com/prometheus/client_golang/prometheus" 12 + ) 13 + 14 + var log = logging.Logger("autoscaling-scheduler") 15 + 16 + // Scheduler is a scheduler that will scale up and down the number of workers based on the throughput of the workers. 17 + type Scheduler struct { 18 + concurrency int 19 + maxConcurrency int 20 + 21 + do func(context.Context, *events.XRPCStreamEvent) error 22 + 23 + feeder chan *consumerTask 24 + out chan struct{} 25 + 26 + lk sync.Mutex 27 + active map[string][]*consumerTask 28 + 29 + ident string 30 + 31 + // metrics 32 + itemsAdded prometheus.Counter 33 + itemsProcessed prometheus.Counter 34 + itemsActive prometheus.Counter 35 + workersActive prometheus.Gauge 36 + 37 + // autoscaling 38 + throughputManager *ThroughputManager 39 + autoscaleFrequency time.Duration 40 + autoscalerIn chan struct{} 41 + autoscalerOut chan struct{} 42 + } 43 + 44 + type AutoscaleSettings struct { 45 + Concurrency int 46 + MaxConcurrency int 47 + AutoscaleFrequency time.Duration 48 + ThroughputBucketCount int 49 + ThroughputBucketDuration time.Duration 50 + } 51 + 52 + // DefaultAutoscaleSettings returns the default autoscale settings. 53 + // Concurrency is the number of workers to start with. 54 + // MaxConcurrency is the maximum number of workers to scale up to. 55 + // AutoscaleFrequency is the frequency to check the average throughput. 56 + // ThroughputBucketCount is the number of buckets to use to calculate the average throughput. 57 + // ThroughputBucketDuration is the duration of each bucket. 58 + // By default we check the average throughput over the last 60 seconds with 1 second buckets 59 + // We make an autoscaling decision every 5 seconds. 60 + // We start with 1 worker and scale up to 32 workers. 61 + func DefaultAutoscaleSettings() AutoscaleSettings { 62 + return AutoscaleSettings{ 63 + Concurrency: 1, 64 + MaxConcurrency: 32, 65 + AutoscaleFrequency: 5 * time.Second, 66 + ThroughputBucketCount: 60, 67 + ThroughputBucketDuration: time.Second, 68 + } 69 + } 70 + 71 + func NewScheduler(autoscaleSettings AutoscaleSettings, ident string, do func(context.Context, *events.XRPCStreamEvent) error) *Scheduler { 72 + p := &Scheduler{ 73 + concurrency: autoscaleSettings.Concurrency, 74 + maxConcurrency: autoscaleSettings.MaxConcurrency, 75 + 76 + do: do, 77 + 78 + feeder: make(chan *consumerTask), 79 + active: make(map[string][]*consumerTask), 80 + out: make(chan struct{}), 81 + 82 + ident: ident, 83 + 84 + itemsAdded: schedulers.WorkItemsAdded.WithLabelValues(ident, "autoscaling"), 85 + itemsProcessed: schedulers.WorkItemsProcessed.WithLabelValues(ident, "autoscaling"), 86 + itemsActive: schedulers.WorkItemsActive.WithLabelValues(ident, "autoscaling"), 87 + workersActive: schedulers.WorkersActive.WithLabelValues(ident, "autoscaling"), 88 + 89 + // autoscaling 90 + // By default, the ThroughputManager will calculate the average throughput over the last 60 seconds. 91 + throughputManager: NewThroughputManager( 92 + autoscaleSettings.ThroughputBucketCount, 93 + autoscaleSettings.ThroughputBucketDuration, 94 + ), 95 + autoscaleFrequency: autoscaleSettings.AutoscaleFrequency, 96 + autoscalerIn: make(chan struct{}), 97 + autoscalerOut: make(chan struct{}), 98 + } 99 + 100 + for i := 0; i < p.concurrency; i++ { 101 + go p.worker() 102 + } 103 + 104 + go p.autoscale() 105 + 106 + return p 107 + } 108 + 109 + func (p *Scheduler) Shutdown() { 110 + log.Infof("shutting down autoscaling scheduler for %s", p.ident) 111 + 112 + // stop autoscaling 113 + p.autoscalerIn <- struct{}{} 114 + close(p.autoscalerIn) 115 + <-p.autoscalerOut 116 + 117 + log.Info("stopping autoscaling scheduler workers") 118 + // stop workers 119 + for i := 0; i < p.concurrency; i++ { 120 + p.feeder <- &consumerTask{signal: "stop"} 121 + } 122 + close(p.feeder) 123 + 124 + log.Info("waiting for autoscaling scheduler workers to stop") 125 + // wait for all workers to stop 126 + for i := 0; i < p.concurrency; i++ { 127 + <-p.out 128 + } 129 + close(p.out) 130 + 131 + log.Info("stopping autoscaling scheduler throughput manager") 132 + p.throughputManager.Stop() 133 + 134 + log.Info("autoscaling scheduler shutdown complete") 135 + } 136 + 137 + // Add autoscaling function 138 + func (p *Scheduler) autoscale() { 139 + p.throughputManager.Start() 140 + tick := time.NewTicker(p.autoscaleFrequency) 141 + defer tick.Stop() 142 + for { 143 + select { 144 + case <-p.autoscalerIn: 145 + p.autoscalerOut <- struct{}{} 146 + close(p.autoscalerOut) 147 + return 148 + case <-tick.C: 149 + avg := p.throughputManager.AvgThroughput() 150 + if avg > float64(p.concurrency) && p.concurrency < p.maxConcurrency { 151 + p.concurrency++ 152 + go p.worker() 153 + } else if avg < float64(p.concurrency-1) && p.concurrency > 1 { 154 + p.concurrency-- 155 + p.feeder <- &consumerTask{signal: "stop"} 156 + } 157 + } 158 + } 159 + } 160 + 161 + type consumerTask struct { 162 + repo string 163 + val *events.XRPCStreamEvent 164 + signal string 165 + } 166 + 167 + func (p *Scheduler) AddWork(ctx context.Context, repo string, val *events.XRPCStreamEvent) error { 168 + p.itemsAdded.Inc() 169 + p.throughputManager.Add(1) 170 + t := &consumerTask{ 171 + repo: repo, 172 + val: val, 173 + } 174 + p.lk.Lock() 175 + 176 + a, ok := p.active[repo] 177 + if ok { 178 + p.active[repo] = append(a, t) 179 + p.lk.Unlock() 180 + return nil 181 + } 182 + 183 + p.active[repo] = []*consumerTask{} 184 + p.lk.Unlock() 185 + 186 + select { 187 + case p.feeder <- t: 188 + return nil 189 + case <-ctx.Done(): 190 + return ctx.Err() 191 + } 192 + } 193 + 194 + func (p *Scheduler) worker() { 195 + log.Infof("starting autoscaling worker for %s", p.ident) 196 + p.workersActive.Inc() 197 + for work := range p.feeder { 198 + for work != nil { 199 + // Check if the work item contains a signal to stop the worker. 200 + if work.signal == "stop" { 201 + log.Infof("stopping autoscaling worker for %s", p.ident) 202 + p.workersActive.Dec() 203 + p.out <- struct{}{} 204 + return 205 + } 206 + 207 + p.itemsActive.Inc() 208 + if err := p.do(context.TODO(), work.val); err != nil { 209 + log.Errorf("event handler failed: %s", err) 210 + } 211 + p.itemsProcessed.Inc() 212 + 213 + p.lk.Lock() 214 + rem, ok := p.active[work.repo] 215 + if !ok { 216 + log.Errorf("should always have an 'active' entry if a worker is processing a job") 217 + } 218 + 219 + if len(rem) == 0 { 220 + delete(p.active, work.repo) 221 + work = nil 222 + } else { 223 + work = rem[0] 224 + p.active[work.repo] = rem[1:] 225 + } 226 + p.lk.Unlock() 227 + } 228 + } 229 + }
+77
events/schedulers/autoscaling/throughput.go
··· 1 + package autoscaling 2 + 3 + import ( 4 + "sync" 5 + "time" 6 + ) 7 + 8 + // ThroughputManager keeps track of the number of tasks processed per bucketDuration over a specified bucketCount. 9 + type ThroughputManager struct { 10 + mu sync.Mutex 11 + circular []int 12 + pos int 13 + sum int 14 + bucketCount int 15 + bucketDuration time.Duration 16 + in chan struct{} 17 + out chan struct{} 18 + } 19 + 20 + // NewThroughputManager creates a new ThroughputManager with the specified interval. 21 + func NewThroughputManager(bucketCount int, bucketDuration time.Duration) *ThroughputManager { 22 + return &ThroughputManager{ 23 + circular: make([]int, bucketCount), 24 + bucketCount: bucketCount, 25 + bucketDuration: bucketDuration, 26 + } 27 + } 28 + 29 + // Add increments the count of tasks processed in the current bucket 30 + func (m *ThroughputManager) Add(n int) { 31 + m.mu.Lock() 32 + defer m.mu.Unlock() 33 + 34 + // increment the current position's value 35 + m.circular[m.pos] += n 36 + m.sum += n 37 + } 38 + 39 + // AvgThroughput returns the average number of tasks processed per 40 + // bucketDuration over the past bucketCount buckets. 41 + func (m *ThroughputManager) AvgThroughput() float64 { 42 + m.mu.Lock() 43 + defer m.mu.Unlock() 44 + 45 + return float64(m.sum) / float64(m.bucketCount) 46 + } 47 + 48 + // shift shifts the position in the circular buffer every bucketDuration, resetting the old value. 49 + func (m *ThroughputManager) shift() { 50 + tick := time.NewTicker(m.bucketDuration) 51 + for { 52 + select { 53 + case <-tick.C: 54 + m.mu.Lock() 55 + 56 + m.pos = (m.pos + 1) % m.bucketCount 57 + m.sum -= m.circular[m.pos] 58 + m.circular[m.pos] = 0 59 + 60 + m.mu.Unlock() 61 + case <-m.in: 62 + m.out <- struct{}{} 63 + return 64 + } 65 + } 66 + } 67 + 68 + // Start starts the ThroughputManager 69 + // It ticks every bucketDuration, shifting the position in the circular buffer. 70 + func (m *ThroughputManager) Start() { 71 + go m.shift() 72 + } 73 + 74 + func (m *ThroughputManager) Stop() { 75 + m.in <- struct{}{} 76 + <-m.out 77 + }
+26
events/schedulers/metrics.go
··· 1 + package schedulers 2 + 3 + import ( 4 + "github.com/prometheus/client_golang/prometheus" 5 + "github.com/prometheus/client_golang/prometheus/promauto" 6 + ) 7 + 8 + var WorkItemsAdded = promauto.NewCounterVec(prometheus.CounterOpts{ 9 + Name: "indigo_scheduler_work_items_added_total", 10 + Help: "Total number of work items added to the consumer pool", 11 + }, []string{"pool", "scheduler_type"}) 12 + 13 + var WorkItemsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{ 14 + Name: "indigo_scheduler_work_items_processed_total", 15 + Help: "Total number of work items processed by the consumer pool", 16 + }, []string{"pool", "scheduler_type"}) 17 + 18 + var WorkItemsActive = promauto.NewCounterVec(prometheus.CounterOpts{ 19 + Name: "indigo_scheduler_work_items_active_total", 20 + Help: "Total number of work items passed into a worker", 21 + }, []string{"pool", "scheduler_type"}) 22 + 23 + var WorkersActive = promauto.NewGaugeVec(prometheus.GaugeOpts{ 24 + Name: "indigo_scheduler_workers_active", 25 + Help: "Number of workers currently active", 26 + }, []string{"pool", "scheduler_type"})
+146
events/schedulers/parallel/parallel.go
··· 1 + package parallel 2 + 3 + import ( 4 + "context" 5 + "sync" 6 + 7 + "github.com/bluesky-social/indigo/events" 8 + "github.com/bluesky-social/indigo/events/schedulers" 9 + logging "github.com/ipfs/go-log" 10 + 11 + "github.com/prometheus/client_golang/prometheus" 12 + ) 13 + 14 + var log = logging.Logger("parallel-scheduler") 15 + 16 + // Scheduler is a parallel scheduler that will run work on a fixed number of workers 17 + type Scheduler struct { 18 + maxConcurrency int 19 + maxQueue int 20 + 21 + do func(context.Context, *events.XRPCStreamEvent) error 22 + 23 + feeder chan *consumerTask 24 + out chan struct{} 25 + 26 + lk sync.Mutex 27 + active map[string][]*consumerTask 28 + 29 + ident string 30 + 31 + // metrics 32 + itemsAdded prometheus.Counter 33 + itemsProcessed prometheus.Counter 34 + itemsActive prometheus.Counter 35 + workesActive prometheus.Gauge 36 + } 37 + 38 + func NewScheduler(maxC, maxQ int, ident string, do func(context.Context, *events.XRPCStreamEvent) error) *Scheduler { 39 + p := &Scheduler{ 40 + maxConcurrency: maxC, 41 + maxQueue: maxQ, 42 + 43 + do: do, 44 + 45 + feeder: make(chan *consumerTask), 46 + active: make(map[string][]*consumerTask), 47 + out: make(chan struct{}), 48 + 49 + ident: ident, 50 + 51 + itemsAdded: schedulers.WorkItemsAdded.WithLabelValues(ident, "parallel"), 52 + itemsProcessed: schedulers.WorkItemsProcessed.WithLabelValues(ident, "parallel"), 53 + itemsActive: schedulers.WorkItemsActive.WithLabelValues(ident, "parallel"), 54 + workesActive: schedulers.WorkersActive.WithLabelValues(ident, "parallel"), 55 + } 56 + 57 + for i := 0; i < maxC; i++ { 58 + go p.worker() 59 + } 60 + 61 + p.workesActive.Set(float64(maxC)) 62 + 63 + return p 64 + } 65 + 66 + func (p *Scheduler) Shutdown() { 67 + log.Infof("shutting down parallel scheduler for %s", p.ident) 68 + 69 + for i := 0; i < p.maxConcurrency; i++ { 70 + p.feeder <- &consumerTask{ 71 + control: "stop", 72 + } 73 + } 74 + 75 + close(p.feeder) 76 + 77 + for i := 0; i < p.maxConcurrency; i++ { 78 + <-p.out 79 + } 80 + 81 + log.Info("parallel scheduler shutdown complete") 82 + } 83 + 84 + type consumerTask struct { 85 + repo string 86 + val *events.XRPCStreamEvent 87 + control string 88 + } 89 + 90 + func (p *Scheduler) AddWork(ctx context.Context, repo string, val *events.XRPCStreamEvent) error { 91 + p.itemsAdded.Inc() 92 + t := &consumerTask{ 93 + repo: repo, 94 + val: val, 95 + } 96 + p.lk.Lock() 97 + 98 + a, ok := p.active[repo] 99 + if ok { 100 + p.active[repo] = append(a, t) 101 + p.lk.Unlock() 102 + return nil 103 + } 104 + 105 + p.active[repo] = []*consumerTask{} 106 + p.lk.Unlock() 107 + 108 + select { 109 + case p.feeder <- t: 110 + return nil 111 + case <-ctx.Done(): 112 + return ctx.Err() 113 + } 114 + } 115 + 116 + func (p *Scheduler) worker() { 117 + for work := range p.feeder { 118 + for work != nil { 119 + if work.control == "stop" { 120 + p.out <- struct{}{} 121 + return 122 + } 123 + 124 + p.itemsActive.Inc() 125 + if err := p.do(context.TODO(), work.val); err != nil { 126 + log.Errorf("event handler failed: %s", err) 127 + } 128 + p.itemsProcessed.Inc() 129 + 130 + p.lk.Lock() 131 + rem, ok := p.active[work.repo] 132 + if !ok { 133 + log.Errorf("should always have an 'active' entry if a worker is processing a job") 134 + } 135 + 136 + if len(rem) == 0 { 137 + delete(p.active, work.repo) 138 + work = nil 139 + } else { 140 + work = rem[0] 141 + p.active[work.repo] = rem[1:] 142 + } 143 + p.lk.Unlock() 144 + } 145 + } 146 + }
+1
events/schedulers/scheduler.go
··· 1 + package schedulers
+54
events/schedulers/sequential/sequential.go
··· 1 + package sequential 2 + 3 + import ( 4 + "context" 5 + 6 + "github.com/bluesky-social/indigo/events" 7 + "github.com/bluesky-social/indigo/events/schedulers" 8 + logging "github.com/ipfs/go-log" 9 + "github.com/prometheus/client_golang/prometheus" 10 + ) 11 + 12 + var log = logging.Logger("sequential-scheduler") 13 + 14 + // Scheduler is a sequential scheduler that will run work on a single worker 15 + type Scheduler struct { 16 + Do func(context.Context, *events.XRPCStreamEvent) error 17 + 18 + ident string 19 + 20 + // metrics 21 + itemsAdded prometheus.Counter 22 + itemsProcessed prometheus.Counter 23 + itemsActive prometheus.Counter 24 + workersActive prometheus.Gauge 25 + } 26 + 27 + func NewScheduler(ident string, do func(context.Context, *events.XRPCStreamEvent) error) *Scheduler { 28 + p := &Scheduler{ 29 + Do: do, 30 + 31 + ident: ident, 32 + 33 + itemsAdded: schedulers.WorkItemsAdded.WithLabelValues(ident, "sequential"), 34 + itemsProcessed: schedulers.WorkItemsProcessed.WithLabelValues(ident, "sequential"), 35 + itemsActive: schedulers.WorkItemsActive.WithLabelValues(ident, "sequential"), 36 + workersActive: schedulers.WorkersActive.WithLabelValues(ident, "sequential"), 37 + } 38 + 39 + p.workersActive.Set(1) 40 + 41 + return p 42 + } 43 + 44 + func (p *Scheduler) Shutdown() { 45 + p.workersActive.Set(0) 46 + } 47 + 48 + func (s *Scheduler) AddWork(ctx context.Context, repo string, val *events.XRPCStreamEvent) error { 49 + s.itemsAdded.Inc() 50 + s.itemsActive.Inc() 51 + err := s.Do(ctx, val) 52 + s.itemsProcessed.Inc() 53 + return err 54 + }
+8 -1
search/server.go
··· 14 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 15 bsky "github.com/bluesky-social/indigo/api/bsky" 16 16 "github.com/bluesky-social/indigo/events" 17 + "github.com/bluesky-social/indigo/events/schedulers/autoscaling" 17 18 lexutil "github.com/bluesky-social/indigo/lex/util" 18 19 "github.com/bluesky-social/indigo/repo" 19 20 "github.com/bluesky-social/indigo/repomgr" ··· 195 196 }, 196 197 } 197 198 198 - return events.HandleRepoStream(ctx, con, events.NewConsumerPool(8, 32, s.bgshost, rsc.EventHandler)) 199 + return events.HandleRepoStream( 200 + ctx, con, autoscaling.NewScheduler( 201 + autoscaling.DefaultAutoscaleSettings(), 202 + s.bgshost, 203 + rsc.EventHandler, 204 + ), 205 + ) 199 206 } 200 207 201 208 func (s *Server) handleOp(ctx context.Context, op repomgr.EventKind, seq int64, path string, did string, rcid *cid.Cid, rec any) error {
+3 -1
testing/labelmaker_fakedata_test.go
··· 13 13 label "github.com/bluesky-social/indigo/api/label" 14 14 "github.com/bluesky-social/indigo/carstore" 15 15 "github.com/bluesky-social/indigo/events" 16 + "github.com/bluesky-social/indigo/events/schedulers/sequential" 16 17 "github.com/bluesky-social/indigo/labeler" 17 18 "github.com/bluesky-social/indigo/util" 18 19 "github.com/bluesky-social/indigo/xrpc" ··· 109 110 return nil 110 111 }, 111 112 } 112 - if err := events.HandleRepoStream(ctx, con, &events.SequentialScheduler{rsc.EventHandler}); err != nil { 113 + seqScheduler := sequential.NewScheduler("test", rsc.EventHandler) 114 + if err := events.HandleRepoStream(ctx, con, seqScheduler); err != nil { 113 115 fmt.Println(err) 114 116 } 115 117 }()
+3 -1
testing/utils.go
··· 23 23 "github.com/bluesky-social/indigo/bgs" 24 24 "github.com/bluesky-social/indigo/carstore" 25 25 "github.com/bluesky-social/indigo/events" 26 + "github.com/bluesky-social/indigo/events/schedulers/sequential" 26 27 "github.com/bluesky-social/indigo/indexer" 27 28 lexutil "github.com/bluesky-social/indigo/lex/util" 28 29 "github.com/bluesky-social/indigo/models" ··· 540 541 return nil 541 542 }, 542 543 } 543 - if err := events.HandleRepoStream(ctx, con, &events.SequentialScheduler{rsc.EventHandler}); err != nil { 544 + seqScheduler := sequential.NewScheduler("test", rsc.EventHandler) 545 + if err := events.HandleRepoStream(ctx, con, seqScheduler); err != nil { 544 546 fmt.Println(err) 545 547 } 546 548 }()