this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

drop consumers if their out buffer fills up (#417)

right now if these buffers fill up we just start dropping events, and
then when the buffers drain we just resume sending events. this seems
quite wrong.

authored by

Whyrusleeping and committed by
GitHub
d47a672f 32b3dce0

+19 -7
+19 -7
events/events.go
··· 5 5 "errors" 6 6 "fmt" 7 7 "sync" 8 + "time" 8 9 9 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 10 11 label "github.com/bluesky-social/indigo/api/label" ··· 73 74 select { 74 75 case s.outgoing <- evt: 75 76 case <-s.done: 77 + default: 78 + log.Warnw("dropping slow consumer due to event overflow", "bufferSize", len(s.outgoing), "ident", s.ident) 76 79 go func(torem *Subscriber) { 77 - em.rmSubscriber(torem) 80 + select { 81 + case torem.outgoing <- &XRPCStreamEvent{ 82 + Error: &ErrorFrame{ 83 + Error: "ConsumerTooSlow", 84 + }, 85 + }: 86 + case <-time.After(time.Second * 5): 87 + log.Warnw("failed to send error frame to backed up consumer", "ident", torem.ident) 88 + } 89 + torem.cleanup() 78 90 }(s) 79 - default: 80 - log.Warnf("event overflow (%d)", len(s.outgoing)) 81 91 } 82 92 s.broadcastCounter.Inc() 83 93 } ··· 99 109 filter func(*XRPCStreamEvent) bool 100 110 101 111 done chan struct{} 112 + 113 + cleanup func() 102 114 103 115 ident string 104 116 enqueuedCounter prometheus.Counter ··· 164 176 broadcastCounter: eventsBroadcast.WithLabelValues(ident), 165 177 } 166 178 167 - cleanup := func() { 179 + sub.cleanup = sync.OnceFunc(func() { 168 180 close(done) 169 181 em.rmSubscriber(sub) 170 182 close(sub.outgoing) 171 - } 183 + }) 172 184 173 185 if since == nil { 174 186 em.addSubscriber(sub) 175 - return sub.outgoing, cleanup, nil 187 + return sub.outgoing, sub.cleanup, nil 176 188 } 177 189 178 190 out := make(chan *XRPCStreamEvent, em.bufferSize) ··· 243 255 } 244 256 }() 245 257 246 - return out, cleanup, nil 258 + return out, sub.cleanup, nil 247 259 } 248 260 249 261 func sequenceForEvent(evt *XRPCStreamEvent) int64 {