this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

scheduler: add lastSeq tracking

+39 -4
+39 -4
cmd/relay/stream/schedulers/parallel/parallel.go
··· 4 4 "context" 5 5 "log/slog" 6 6 "sync" 7 + "sync/atomic" 7 8 8 9 "github.com/bluesky-social/indigo/cmd/relay/stream" 9 10 "github.com/bluesky-social/indigo/cmd/relay/stream/schedulers" ··· 26 27 27 28 ident string 28 29 30 + // sequence number tracking 31 + inflightSeq map[int64]bool 32 + lastSeq atomic.Int64 33 + 29 34 // metrics 30 35 itemsAdded prometheus.Counter 31 36 itemsProcessed prometheus.Counter 32 37 itemsActive prometheus.Counter 33 - workesActive prometheus.Gauge 38 + workersActive prometheus.Gauge 34 39 35 40 log *slog.Logger 36 41 } ··· 46 51 active: make(map[string][]*consumerTask), 47 52 out: make(chan struct{}), 48 53 49 - ident: ident, 54 + ident: ident, 55 + inflightSeq: make(map[int64]bool), 50 56 51 57 itemsAdded: schedulers.WorkItemsAdded.WithLabelValues(ident, "parallel"), 52 58 itemsProcessed: schedulers.WorkItemsProcessed.WithLabelValues(ident, "parallel"), 53 59 itemsActive: schedulers.WorkItemsActive.WithLabelValues(ident, "parallel"), 54 - workesActive: schedulers.WorkersActive.WithLabelValues(ident, "parallel"), 60 + workersActive: schedulers.WorkersActive.WithLabelValues(ident, "parallel"), 55 61 56 62 log: slog.Default().With("system", "parallel-scheduler"), 57 63 } ··· 60 66 go p.worker() 61 67 } 62 68 63 - p.workesActive.Set(float64(maxC)) 69 + p.workersActive.Set(float64(maxC)) 64 70 65 71 return p 66 72 } ··· 97 103 } 98 104 p.lk.Lock() 99 105 106 + // mark sequence number as being worked on 107 + seq := val.Sequence() 108 + if seq > 0 { 109 + p.inflightSeq[seq] = true 110 + } 111 + 100 112 a, ok := p.active[repo] 101 113 if ok { 102 114 p.active[repo] = append(a, t) ··· 124 136 } 125 137 126 138 p.itemsActive.Inc() 139 + seq := work.val.Sequence() 127 140 if err := p.do(context.TODO(), work.val); err != nil { 128 141 p.log.Error("event handler failed", "err", err) 129 142 } ··· 142 155 work = rem[0] 143 156 p.active[work.repo] = rem[1:] 144 157 } 158 + 159 + // remove sequence number from inflight set, and update lastSeq if it was the "oldest" 160 + // TODO: do we need backpressure to prevent the inflight set from growing unbounded if a single event from a host is hung? or timeouts on event processing? 161 + if seq > 0 { 162 + delete(p.inflightSeq, seq) 163 + lowest := true 164 + for k := range p.inflightSeq { 165 + if k < seq { 166 + lowest = false 167 + break 168 + } 169 + } 170 + if lowest { 171 + //p.log.Trace("updating lastSeq", "seq", seq, "lastSeq", p.lastSeq.Load(), "inflight", p.inflightSeq) 172 + p.lastSeq.Store(seq) 173 + } 174 + } 175 + 145 176 p.lk.Unlock() 146 177 } 147 178 } 148 179 } 180 + 181 + func (p *Scheduler) LastSeq() int64 { 182 + return p.lastSeq.Load() 183 + }