this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

bgs: ensure output channel is closed if it isn't sub.output (#1066)

The issue here is that when a cursor is supplied, sub.output isn't
actually the channel which is passed along to calling code; the 'out'
chan is what gets returned.

This means that sub.output getting closed needs to be wired through to
'out' getting closed as well.

NOTE: this is in old relay code ("BGS"). If we like this fix, should
copy it to new relay code (`indigo:cmd/relay/stream`) before merging.

authored by

bnewbold and committed by
GitHub
89be8c91 d61b8cb3

+14 -10
+7 -5
cmd/relay/stream/eventmgr/event_manager.go
··· 168 168 } 169 169 170 170 // TODO: send an error frame or something? 171 + // NOTE: not doing em.rmSubscriber(sub) here because it hasn't been added yet 171 172 close(out) 172 173 return 173 174 } ··· 175 176 // now, start buffering events from the live stream 176 177 em.addSubscriber(sub) 177 178 179 + // ensure that we clean up any return paths from here out, after having added the subscriber. Note that `out` is not `sub.output`, so needs to be closed separately. 180 + defer func() { 181 + close(out) 182 + em.rmSubscriber(sub) 183 + }() 184 + 178 185 first := <-sub.outgoing 179 186 180 187 // run playback again to get us to the events that have started buffering ··· 193 200 }); err != nil { 194 201 if !errors.Is(err, ErrCaughtUp) { 195 202 em.log.Error("events playback", "err", err) 196 - 197 - // TODO: send an error frame or something? 198 - close(out) 199 - em.rmSubscriber(sub) 200 203 return 201 204 } 202 205 } ··· 206 209 select { 207 210 case out <- evt: 208 211 case <-done: 209 - em.rmSubscriber(sub) 210 212 return 211 213 } 212 214 }
+7 -5
events/events.go
··· 415 415 } 416 416 417 417 // TODO: send an error frame or something? 418 + // NOTE: not doing em.rmSubscriber(sub) here because it hasn't been added yet 418 419 close(out) 419 420 return 420 421 } ··· 422 423 // now, start buffering events from the live stream 423 424 em.addSubscriber(sub) 424 425 426 + // ensure that we clean up any return paths from here out, after having added the subscriber. Note that `out` is not `sub.output`, so needs to be closed separately. 427 + defer func() { 428 + close(out) 429 + em.rmSubscriber(sub) 430 + }() 431 + 425 432 first := <-sub.outgoing 426 433 427 434 // run playback again to get us to the events that have started buffering ··· 440 447 }); err != nil { 441 448 if !errors.Is(err, ErrCaughtUp) { 442 449 em.log.Error("events playback", "err", err) 443 - 444 - // TODO: send an error frame or something? 445 - close(out) 446 - em.rmSubscriber(sub) 447 450 return 448 451 } 449 452 } ··· 453 456 select { 454 457 case out <- evt: 455 458 case <-done: 456 - em.rmSubscriber(sub) 457 459 return 458 460 } 459 461 }