this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add sequential scheduler

+55 -1
+1 -1
cmd/relayered/stream/schedulers/parallel/parallel.go
··· 6 6 "sync" 7 7 8 8 "github.com/bluesky-social/indigo/cmd/relayered/stream" 9 - "github.com/bluesky-social/indigo/events/schedulers" 9 + "github.com/bluesky-social/indigo/cmd/relayered/stream/schedulers" 10 10 11 11 "github.com/prometheus/client_golang/prometheus" 12 12 )
+54
cmd/relayered/stream/schedulers/sequential/sequential.go
··· 1 + package sequential 2 + 3 + import ( 4 + "context" 5 + 6 + "github.com/bluesky-social/indigo/cmd/relayered/stream" 7 + "github.com/bluesky-social/indigo/cmd/relayered/stream/schedulers" 8 + 9 + "github.com/prometheus/client_golang/prometheus" 10 + ) 11 + 12 + // var log = slog.Default().With("system", "sequential-scheduler") 13 + 14 + // Scheduler is a sequential scheduler that will run work on a single worker 15 + type Scheduler struct { 16 + Do func(context.Context, *stream.XRPCStreamEvent) error 17 + 18 + ident string 19 + 20 + // metrics 21 + itemsAdded prometheus.Counter 22 + itemsProcessed prometheus.Counter 23 + itemsActive prometheus.Counter 24 + workersActive prometheus.Gauge 25 + } 26 + 27 + func NewScheduler(ident string, do func(context.Context, *stream.XRPCStreamEvent) error) *Scheduler { 28 + p := &Scheduler{ 29 + Do: do, 30 + 31 + ident: ident, 32 + 33 + itemsAdded: schedulers.WorkItemsAdded.WithLabelValues(ident, "sequential"), 34 + itemsProcessed: schedulers.WorkItemsProcessed.WithLabelValues(ident, "sequential"), 35 + itemsActive: schedulers.WorkItemsActive.WithLabelValues(ident, "sequential"), 36 + workersActive: schedulers.WorkersActive.WithLabelValues(ident, "sequential"), 37 + } 38 + 39 + p.workersActive.Set(1) 40 + 41 + return p 42 + } 43 + 44 + func (p *Scheduler) Shutdown() { 45 + p.workersActive.Set(0) 46 + } 47 + 48 + func (s *Scheduler) AddWork(ctx context.Context, repo string, val *stream.XRPCStreamEvent) error { 49 + s.itemsAdded.Inc() 50 + s.itemsActive.Inc() 51 + err := s.Do(ctx, val) 52 + s.itemsProcessed.Inc() 53 + return err 54 + }