this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add a concurrent event processor helper (#70)

Havent integrated this yet, but I figured it would be good to send up
for a review.
Pretty straightforward to make this generic so we can use it for labels
when we need to as well.

authored by

Whyrusleeping and committed by
GitHub
a501a309 d5b32d01

+90 -1
+1 -1
events/events.go
··· 79 79 } 80 80 }(s) 81 81 default: 82 - log.Error("event overflow") 82 + log.Warnf("event overflow (%d)", len(s.outgoing)) 83 83 } 84 84 } 85 85 }
+89
events/parallel.go
··· 1 + package events 2 + 3 + import ( 4 + "context" 5 + "sync" 6 + ) 7 + 8 + type ConsumerPool struct { 9 + maxConcurrency int 10 + maxQueue int 11 + 12 + do func(*XRPCStreamEvent) 13 + 14 + feeder chan *consumerTask 15 + 16 + lk sync.Mutex 17 + active map[string][]*consumerTask 18 + } 19 + 20 + func NewConsumerPool(maxC, maxQ int, do func(*XRPCStreamEvent)) *ConsumerPool { 21 + p := &ConsumerPool{ 22 + maxConcurrency: maxC, 23 + maxQueue: maxQ, 24 + 25 + do: do, 26 + 27 + feeder: make(chan *consumerTask), 28 + active: make(map[string][]*consumerTask), 29 + } 30 + 31 + for i := 0; i < maxC; i++ { 32 + go p.worker() 33 + } 34 + 35 + return p 36 + } 37 + 38 + type consumerTask struct { 39 + repo string 40 + val *XRPCStreamEvent 41 + } 42 + 43 + func (p *ConsumerPool) Add(ctx context.Context, repo string, val *XRPCStreamEvent) error { 44 + t := &consumerTask{ 45 + repo: repo, 46 + val: val, 47 + } 48 + p.lk.Lock() 49 + 50 + a, ok := p.active[repo] 51 + if ok { 52 + p.active[repo] = append(a, t) 53 + p.lk.Unlock() 54 + return nil 55 + } 56 + 57 + p.active[repo] = []*consumerTask{} 58 + p.lk.Unlock() 59 + 60 + select { 61 + case p.feeder <- t: 62 + return nil 63 + case <-ctx.Done(): 64 + return ctx.Err() 65 + } 66 + } 67 + 68 + func (p *ConsumerPool) worker() { 69 + for work := range p.feeder { 70 + for work != nil { 71 + p.do(work.val) 72 + 73 + p.lk.Lock() 74 + rem, ok := p.active[work.repo] 75 + if !ok { 76 + log.Errorf("should always have an 'active' entry if a worker is processing a job") 77 + } 78 + 79 + if len(rem) == 0 { 80 + delete(p.active, work.repo) 81 + work = nil 82 + } else { 83 + work = rem[0] 84 + p.active[work.repo] = rem[1:] 85 + } 86 + p.lk.Unlock() 87 + } 88 + } 89 + }