this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Handle channel closure gracefully without panicing (#427)

authored by

Whyrusleeping and committed by
GitHub
f654c942 668f3da4

+22 -8
+22 -8
events/events.go
··· 77 77 default: 78 78 log.Warnw("dropping slow consumer due to event overflow", "bufferSize", len(s.outgoing), "ident", s.ident) 79 79 go func(torem *Subscriber) { 80 - select { 81 - case torem.outgoing <- &XRPCStreamEvent{ 82 - Error: &ErrorFrame{ 83 - Error: "ConsumerTooSlow", 84 - }, 85 - }: 86 - case <-time.After(time.Second * 5): 87 - log.Warnw("failed to send error frame to backed up consumer", "ident", torem.ident) 80 + torem.lk.Lock() 81 + if !torem.cleanedUp { 82 + select { 83 + case torem.outgoing <- &XRPCStreamEvent{ 84 + Error: &ErrorFrame{ 85 + Error: "ConsumerTooSlow", 86 + }, 87 + }: 88 + case <-time.After(time.Second * 5): 89 + log.Warnw("failed to send error frame to backed up consumer", "ident", torem.ident) 90 + } 88 91 } 92 + torem.lk.Unlock() 89 93 torem.cleanup() 90 94 }(s) 91 95 } ··· 111 115 done chan struct{} 112 116 113 117 cleanup func() 118 + 119 + lk sync.Mutex 120 + cleanedUp bool 114 121 115 122 ident string 116 123 enqueuedCounter prometheus.Counter ··· 177 184 } 178 185 179 186 sub.cleanup = sync.OnceFunc(func() { 187 + sub.lk.Lock() 188 + defer sub.lk.Unlock() 180 189 close(done) 181 190 em.rmSubscriber(sub) 182 191 close(sub.outgoing) 192 + sub.cleanedUp = true 183 193 }) 184 194 185 195 if since == nil { ··· 260 270 261 271 func sequenceForEvent(evt *XRPCStreamEvent) int64 { 262 272 switch { 273 + case evt == nil: 274 + return -1 263 275 case evt.RepoCommit != nil: 264 276 return evt.RepoCommit.Seq 265 277 case evt.RepoHandle != nil: ··· 269 281 case evt.RepoTombstone != nil: 270 282 return evt.RepoTombstone.Seq 271 283 case evt.RepoInfo != nil: 284 + return -1 285 + case evt.Error != nil: 272 286 return -1 273 287 default: 274 288 return -1