···11+package autoscaling
22+33+import (
44+ "context"
55+ "sync"
66+ "time"
77+88+ "github.com/bluesky-social/indigo/events"
99+ "github.com/labstack/gommon/log"
1010+ "github.com/prometheus/client_golang/prometheus"
1111+)
1212+1313+type ConsumerPool struct {
1414+ concurrency int
1515+ maxConcurrency int
1616+1717+ do func(context.Context, *events.XRPCStreamEvent) error
1818+1919+ feeder chan *consumerTask
2020+2121+ lk sync.Mutex
2222+ active map[string][]*consumerTask
2323+2424+ ident string
2525+2626+ // metrics
2727+ itemsAdded prometheus.Counter
2828+ itemsProcessed prometheus.Counter
2929+ itemsActive prometheus.Counter
3030+ workersAcrive prometheus.Gauge
3131+3232+ // autoscaling
3333+ throughputManager *ThroughputManager
3434+}
3535+3636+func NewConsumerPool(concurrency, maxC int, ident string, do func(context.Context, *events.XRPCStreamEvent) error) *ConsumerPool {
3737+ p := &ConsumerPool{
3838+ concurrency: concurrency,
3939+ maxConcurrency: maxC,
4040+4141+ do: do,
4242+4343+ feeder: make(chan *consumerTask),
4444+ active: make(map[string][]*consumerTask),
4545+4646+ ident: ident,
4747+4848+ itemsAdded: workItemsAdded.WithLabelValues(ident, "autoscaling"),
4949+ itemsProcessed: workItemsProcessed.WithLabelValues(ident, "autoscaling"),
5050+ itemsActive: workItemsActive.WithLabelValues(ident, "autoscaling"),
5151+5252+ // autoscaling
5353+ // By default, the ThroughputManager will calculate the average throughput over the last 60 seconds.
5454+ throughputManager: NewThroughputManager(60),
5555+ }
5656+5757+ for i := 0; i < concurrency; i++ {
5858+ go p.worker()
5959+ }
6060+6161+ go p.autoscale()
6262+6363+ return p
6464+}
6565+6666+// Add autoscaling function
6767+func (p *ConsumerPool) autoscale() {
6868+ p.throughputManager.Start()
6969+ tick := time.NewTicker(time.Second * 5) // adjust as needed
7070+ for range tick.C {
7171+ avg := p.throughputManager.AvgThroughput()
7272+ if avg > float64(p.concurrency) && p.concurrency < p.maxConcurrency {
7373+ p.concurrency++
7474+ go p.worker()
7575+ } else if avg < float64(p.concurrency-1) && p.concurrency > 1 {
7676+ p.concurrency--
7777+ p.feeder <- &consumerTask{signal: "stop"}
7878+ }
7979+ }
8080+}
8181+8282+type consumerTask struct {
8383+ repo string
8484+ val *events.XRPCStreamEvent
8585+ signal string
8686+}
8787+8888+func (p *ConsumerPool) AddWork(ctx context.Context, repo string, val *events.XRPCStreamEvent) error {
8989+ p.itemsAdded.Inc()
9090+ p.throughputManager.Add(1)
9191+ t := &consumerTask{
9292+ repo: repo,
9393+ val: val,
9494+ }
9595+ p.lk.Lock()
9696+9797+ a, ok := p.active[repo]
9898+ if ok {
9999+ p.active[repo] = append(a, t)
100100+ p.lk.Unlock()
101101+ return nil
102102+ }
103103+104104+ p.active[repo] = []*consumerTask{}
105105+ p.lk.Unlock()
106106+107107+ select {
108108+ case p.feeder <- t:
109109+ return nil
110110+ case <-ctx.Done():
111111+ return ctx.Err()
112112+ }
113113+}
114114+115115+func (p *ConsumerPool) worker() {
116116+ log.Infof("starting autoscaling worker for %s", p.ident)
117117+ p.workersAcrive.Inc()
118118+ for work := range p.feeder {
119119+ for work != nil {
120120+ // Check if the work item contains a signal to stop the worker.
121121+ if work.signal == "stop" {
122122+ log.Infof("stopping autoscaling worker for %s", p.ident)
123123+ p.workersAcrive.Dec()
124124+ return
125125+ }
126126+127127+ p.itemsActive.Inc()
128128+ if err := p.do(context.TODO(), work.val); err != nil {
129129+ log.Errorf("event handler failed: %s", err)
130130+ }
131131+ p.itemsProcessed.Inc()
132132+133133+ p.lk.Lock()
134134+ rem, ok := p.active[work.repo]
135135+ if !ok {
136136+ log.Errorf("should always have an 'active' entry if a worker is processing a job")
137137+ }
138138+139139+ if len(rem) == 0 {
140140+ delete(p.active, work.repo)
141141+ work = nil
142142+ } else {
143143+ work = rem[0]
144144+ p.active[work.repo] = rem[1:]
145145+ }
146146+ p.lk.Unlock()
147147+ }
148148+ }
149149+}
+26
events/autoscaling/metrics.go
···11+package autoscaling
22+33+import (
44+ "github.com/prometheus/client_golang/prometheus"
55+ "github.com/prometheus/client_golang/prometheus/promauto"
66+)
77+88+var workItemsAdded = promauto.NewCounterVec(prometheus.CounterOpts{
99+ Name: "indigo_pool_work_items_added_total",
1010+ Help: "Total number of work items added to the consumer pool",
1111+}, []string{"pool", "pool_type"})
1212+1313+var workItemsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{
1414+ Name: "indigo_pool_work_items_processed_total",
1515+ Help: "Total number of work items processed by the consumer pool",
1616+}, []string{"pool", "pool_type"})
1717+1818+var workItemsActive = promauto.NewCounterVec(prometheus.CounterOpts{
1919+ Name: "indigo_pool_work_items_active_total",
2020+ Help: "Total number of work items passed into a worker",
2121+}, []string{"pool", "pool_type"})
2222+2323+var workersActive = promauto.NewGaugeVec(prometheus.GaugeOpts{
2424+ Name: "indigo_pool_workers_active",
2525+ Help: "Number of workers currently active",
2626+}, []string{"pool", "pool_type"})
+61
events/autoscaling/throughput.go
···11+package autoscaling
22+33+import (
44+ "sync"
55+ "time"
66+)
77+88+// ThroughputManager keeps track of the number of tasks processed per second over a specified interval.
99+type ThroughputManager struct {
1010+ mu sync.Mutex
1111+ circular []int
1212+ pos int
1313+ sum int
1414+ interval int
1515+}
1616+1717+// NewThroughputManager creates a new ThroughputManager with the specified interval.
1818+func NewThroughputManager(interval int) *ThroughputManager {
1919+ return &ThroughputManager{
2020+ circular: make([]int, interval),
2121+ interval: interval,
2222+ }
2323+}
2424+2525+// Add increments the count of tasks processed in the current second.
2626+func (m *ThroughputManager) Add(n int) {
2727+ m.mu.Lock()
2828+ defer m.mu.Unlock()
2929+3030+ // increment the current position's value
3131+ m.circular[m.pos] += n
3232+ m.sum += n
3333+}
3434+3535+// AvgThroughput returns the average number of tasks processed per second over the past interval.
3636+func (m *ThroughputManager) AvgThroughput() float64 {
3737+ m.mu.Lock()
3838+ defer m.mu.Unlock()
3939+4040+ return float64(m.sum) / float64(m.interval)
4141+}
4242+4343+// shift shifts the position in the circular buffer every second, resetting the old value.
4444+func (m *ThroughputManager) shift() {
4545+ tick := time.NewTicker(time.Second)
4646+ for range tick.C {
4747+ m.mu.Lock()
4848+4949+ m.pos = (m.pos + 1) % m.interval
5050+ m.sum -= m.circular[m.pos]
5151+ m.circular[m.pos] = 0
5252+5353+ m.mu.Unlock()
5454+ }
5555+}
5656+5757+// Start starts the ThroughputManager
5858+// It ticks every second, shifting the position in the circular buffer.
5959+func (m *ThroughputManager) Start() {
6060+ go m.shift()
6161+}