this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

use fewer channels in outbound events loop (#181)

This is a pre-refactor refactor for some work i'm going to do on the
event persister, the channels here make it difficult to implement
batched persisting so i'm pulling those out now, and then will do the
rest in a separate PR

authored by

Whyrusleeping and committed by
GitHub
8912d95e 8162bdb6

+55 -86
-2
cmd/bigsky/main.go
··· 180 180 181 181 evtman := events.NewEventManager(dbp) 182 182 183 - go evtman.Run() 184 - 185 183 notifman := &notifs.NullNotifs{} 186 184 187 185 ix, err := indexer.NewIndexer(db, notifman, evtman, cachedidr, repoman, true, cctx.Bool("aggregation"))
+55 -78
events/events.go
··· 4 4 "context" 5 5 "errors" 6 6 "fmt" 7 + "sync" 7 8 8 9 comatproto "github.com/bluesky-social/indigo/api/atproto" 9 10 label "github.com/bluesky-social/indigo/api/label" ··· 16 17 var log = logging.Logger("events") 17 18 18 19 type EventManager struct { 19 - subs []*Subscriber 20 + subs []*Subscriber 21 + subsLk sync.Mutex 20 22 21 - ops chan *Operation 22 - closed chan struct{} 23 23 bufferSize int 24 24 25 25 persister EventPersistence ··· 27 27 28 28 func NewEventManager(persister EventPersistence) *EventManager { 29 29 return &EventManager{ 30 - ops: make(chan *Operation), 31 - closed: make(chan struct{}), 32 30 bufferSize: 1024, 33 31 persister: persister, 34 32 } ··· 46 44 evt *XRPCStreamEvent 47 45 } 48 46 49 - func (em *EventManager) Run() { 50 - for op := range em.ops { 51 - switch op.op { 52 - case opSubscribe: 53 - em.subs = append(em.subs, op.sub) 54 - case opUnsubscribe: 55 - for i, s := range em.subs { 56 - if s == op.sub { 57 - em.subs[i] = em.subs[len(em.subs)-1] 58 - em.subs = em.subs[:len(em.subs)-1] 59 - break 60 - } 61 - } 62 - case opSend: 63 - if err := em.persister.Persist(context.TODO(), op.evt); err != nil { 64 - log.Errorf("failed to persist outbound event: %s", err) 65 - } 47 + func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) { 48 + // NOTE: Assumes subsLk is held 66 49 67 - for _, s := range em.subs { 68 - if s.filter(op.evt) { 69 - select { 70 - case s.outgoing <- op.evt: 71 - case <-s.done: 72 - go func(torem *Subscriber) { 73 - select { 74 - case em.ops <- &Operation{ 75 - op: opUnsubscribe, 76 - sub: torem, 77 - }: 78 - case <-em.closed: 79 - } 80 - }(s) 81 - default: 82 - log.Warnf("event overflow (%d)", len(s.outgoing)) 83 - } 84 - } 50 + // TODO: for a larger fanout we should probably have dedicated goroutines 51 + // for subsets of the subscriber set, and tiered channels to distribute 52 + // events out to them, or some similar architecture 53 + // Alternatively, we might just want to not allow too many subscribers 54 + // directly to the bgs, and have rebroadcasting proxies instead 55 + for _, s := range em.subs { 56 + if s.filter(evt) { 57 + select { 58 + case s.outgoing <- evt: 59 + case <-s.done: 60 + go func(torem *Subscriber) { 61 + em.rmSubscriber(torem) 62 + }(s) 63 + default: 64 + log.Warnf("event overflow (%d)", len(s.outgoing)) 85 65 } 86 - default: 87 - log.Errorf("unrecognized eventmgr operation: %d", op.op) 88 66 } 89 67 } 90 68 } 91 69 70 + func (em *EventManager) persistAndSendEvent(ctx context.Context, evt *XRPCStreamEvent) { 71 + em.subsLk.Lock() 72 + defer em.subsLk.Unlock() 73 + 74 + if err := em.persister.Persist(context.TODO(), evt); err != nil { 75 + log.Errorf("failed to persist outbound event: %s", err) 76 + } 77 + 78 + em.broadcastEvent(evt) 79 + } 80 + 92 81 type Subscriber struct { 93 82 outgoing chan *XRPCStreamEvent 94 83 ··· 132 121 ctx, span := otel.Tracer("events").Start(ctx, "AddEvent") 133 122 defer span.End() 134 123 135 - select { 136 - case em.ops <- &Operation{ 137 - op: opSend, 138 - evt: ev, 139 - }: 140 - return nil 141 - case <-em.closed: 142 - return fmt.Errorf("event manager shut down") 143 - } 144 - } 145 - 146 - func (em *EventManager) AddLabelEvent(ev *XRPCStreamEvent) error { 147 - select { 148 - case em.ops <- &Operation{ 149 - op: opSend, 150 - evt: ev, 151 - }: 152 - return nil 153 - case <-em.closed: 154 - return fmt.Errorf("event manager shut down") 155 - } 124 + em.persistAndSendEvent(ctx, ev) 125 + return nil 156 126 } 157 127 158 128 var ErrPlaybackShutdown = fmt.Errorf("playback shutting down") ··· 194 164 default: 195 165 } 196 166 197 - select { 198 - case em.ops <- &Operation{ 199 - op: opSubscribe, 200 - sub: sub, 201 - }: 202 - case <-em.closed: 203 - log.Errorf("failed to subscribe, event manager shut down") 204 - } 167 + em.addSubscriber(sub) 205 168 }() 206 169 207 170 cleanup := func() { 208 171 close(done) 209 - select { 210 - case em.ops <- &Operation{ 211 - op: opUnsubscribe, 212 - sub: sub, 213 - }: 214 - case <-em.closed: 172 + em.rmSubscriber(sub) 173 + } 174 + 175 + return sub.outgoing, cleanup, nil 176 + } 177 + 178 + func (em *EventManager) rmSubscriber(sub *Subscriber) { 179 + em.subsLk.Lock() 180 + defer em.subsLk.Unlock() 181 + 182 + for i, s := range em.subs { 183 + if s == sub { 184 + em.subs[i] = em.subs[len(em.subs)-1] 185 + em.subs = em.subs[:len(em.subs)-1] 186 + break 215 187 } 216 188 } 189 + } 217 190 218 - return sub.outgoing, cleanup, nil 191 + func (em *EventManager) addSubscriber(sub *Subscriber) { 192 + em.subsLk.Lock() 193 + defer em.subsLk.Unlock() 194 + 195 + em.subs = append(em.subs, sub) 219 196 } 220 197 221 198 func (em *EventManager) TakeDownRepo(ctx context.Context, user util.Uid) error {
-2
labeler/service.go
··· 117 117 } 118 118 s.bgsSlurper = slurp 119 119 120 - go evtmgr.Run() 121 - 122 120 return s, nil 123 121 } 124 122
-2
pds/server.go
··· 106 106 107 107 s.feedgen = feedgen 108 108 109 - go evtman.Run() 110 - 111 109 return s, nil 112 110 } 113 111
-2
testing/utils.go
··· 396 396 397 397 evtman := events.NewEventManager(dbpersist) 398 398 399 - go evtman.Run() 400 - 401 399 ix, err := indexer.NewIndexer(maindb, notifman, evtman, didr, repoman, true, true) 402 400 if err != nil { 403 401 return nil, err