Monorepo for Tangled tangled.org
859
fork

Configure Feed

Select the types of activity you want to include in your feed.

eventconsumer: add RemoveSource and consolidate per-source state #295

open opened by mitchellh.com targeting master from mitchellh.com/tangled-core: push-rrlqlkktnoln

Adds RemoveSource to stop streaming from a previously registered source. Removing a source cancels its connection loop and closes any in-flight websocket so the loop exits promptly instead of waiting for the next reconnect tick.

Per-source runtime state (the loop's cancel func and the active websocket conn) was previously split across two sync.Maps and a separate map of registered sources, each with its own locking. That left several races, e.g. the conn was stored in connMap only after a successful dial, so a remove during the dial would miss it.

The new design collapses everything into a single sources map[Source]*sourceState guarded by sourcesMu. The lock is only held for short, non-blocking map mutations and is always released before any side effect possibly-blocking calls.

runConnection now re-checks under the lock that the source is still registered (and the ctx not cancelled) before installing a freshly dialed conn, and its deferred cleanup only clears the conn slot if the entry still points at the conn it installed. This makes 'remove during dial' and 'remove during read' both deterministic: the cancel cuts the loop, the close breaks ReadMessage, and a concurrent runConnection cannot resurrect state for a removed source.

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:onu3oqfahfubgbetlr4giknc/sh.tangled.repo.pull/3mksxpu475m22
+80 -20
Diff #0
+80 -20
eventconsumer/consumer.go
··· 54 54 type Consumer struct { 55 55 wg sync.WaitGroup 56 56 dialer *websocket.Dialer 57 - connMap sync.Map 58 57 jobQueue chan job 59 58 logger *slog.Logger 60 59 randSource *rand.Rand 61 60 62 - // rw lock over edits to ConsumerConfig 63 - cfgMu sync.RWMutex 64 - cfg ConsumerConfig 61 + // sourcesMu guards sources. It must only be held for short, non-blocking 62 + // map operations; never across a blocking call (dial, read, close). 63 + sourcesMu sync.Mutex 64 + sources map[Source]*sourceState 65 + 66 + cfg ConsumerConfig 67 + } 68 + 69 + type sourceState struct { 70 + cancel context.CancelFunc 71 + conn *websocket.Conn 65 72 } 66 73 67 74 type job struct { ··· 97 104 jobQueue: make(chan job, cfg.QueueSize), // buffered job queue 98 105 logger: cfg.Logger, 99 106 randSource: rand.New(rand.NewSource(time.Now().UnixNano())), 107 + sources: make(map[Source]*sourceState), 100 108 } 101 109 } 102 110 ··· 111 119 112 120 // start streaming 113 121 for source := range c.cfg.Sources { 114 - c.wg.Add(1) 115 - go c.startConnectionLoop(ctx, source) 122 + c.AddSource(ctx, source) 116 123 } 117 124 } 118 125 119 126 func (c *Consumer) Stop() { 120 - c.connMap.Range(func(_, val any) bool { 121 - if conn, ok := val.(*websocket.Conn); ok { 122 - conn.Close() 127 + // snapshot conns under lock so we don't hold sourcesMu across Close 128 + c.sourcesMu.Lock() 129 + conns := make([]*websocket.Conn, 0, len(c.sources)) 130 + for _, st := range c.sources { 131 + if st.conn != nil { 132 + conns = append(conns, st.conn) 123 133 } 124 - return true 125 - }) 134 + } 135 + c.sourcesMu.Unlock() 136 + 137 + for _, conn := range conns { 138 + conn.Close() 139 + } 140 + 126 141 c.wg.Wait() 127 142 close(c.jobQueue) 128 143 } 129 144 130 145 func (c *Consumer) AddSource(ctx context.Context, s Source) { 131 - // we are already listening to this source 132 - if _, ok := c.cfg.Sources[s]; ok { 146 + c.sourcesMu.Lock() 147 + if _, ok := c.sources[s]; ok { 148 + c.sourcesMu.Unlock() 133 149 c.logger.Info("source already present", "source", s) 134 150 return 135 151 } 152 + srcCtx, cancel := context.WithCancel(ctx) 153 + c.sources[s] = &sourceState{cancel: cancel} 154 + c.sourcesMu.Unlock() 136 155 137 - c.cfgMu.Lock() 138 - c.cfg.Sources[s] = struct{}{} 139 156 c.wg.Add(1) 140 - go c.startConnectionLoop(ctx, s) 141 - c.cfgMu.Unlock() 157 + go c.startConnectionLoop(srcCtx, s) 158 + } 159 + 160 + func (c *Consumer) RemoveSource(s Source) { 161 + c.sourcesMu.Lock() 162 + st, ok := c.sources[s] 163 + if !ok { 164 + c.sourcesMu.Unlock() 165 + c.logger.Info("source not present", "source", s) 166 + return 167 + } 168 + delete(c.sources, s) 169 + cancel := st.cancel 170 + conn := st.conn 171 + c.sourcesMu.Unlock() 172 + 173 + // release lock before any potentially blocking call 174 + if cancel != nil { 175 + cancel() 176 + } 177 + if conn != nil { 178 + conn.Close() 179 + } 142 180 } 143 181 144 182 func (c *Consumer) worker(ctx context.Context) { ··· 238 276 return err 239 277 } 240 278 241 - c.connMap.Store(source, conn) 242 - defer conn.Close() 243 - defer c.connMap.Delete(source) 279 + // Register the conn. If the source was removed (or our ctx cancelled) 280 + // while we were dialing, drop this conn instead of installing it. 281 + c.sourcesMu.Lock() 282 + st, ok := c.sources[source] 283 + if !ok || ctx.Err() != nil { 284 + c.sourcesMu.Unlock() 285 + conn.Close() 286 + if ctx.Err() != nil { 287 + return ctx.Err() 288 + } 289 + return nil 290 + } 291 + st.conn = conn 292 + c.sourcesMu.Unlock() 293 + 294 + defer func() { 295 + // Clear the conn from state, but only if it's still our conn (a 296 + // concurrent RemoveSource may have already done it). 297 + c.sourcesMu.Lock() 298 + if st, ok := c.sources[source]; ok && st.conn == conn { 299 + st.conn = nil 300 + } 301 + c.sourcesMu.Unlock() 302 + conn.Close() 303 + }() 244 304 245 305 c.logger.Info("connected", "source", source) 246 306

History

1 round 0 comments
sign up or login to add to the discussion
mitchellh.com submitted #0
1 commit
expand
eventconsumer: add RemoveSource and consolidate per-source state
merge conflicts detected
expand
  • eventconsumer/consumer.go:54
expand 0 comments