this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

dont wait for ack to send historical events

dholms 2733d637 73ca5ea9

+69 -17
+69 -17
nexus/outbox.go
··· 17 17 type InFlightEvent struct { 18 18 ID uint 19 19 DID string 20 + Live bool 20 21 Event *OutboxEvt 21 22 SentAt time.Time 23 + } 24 + 25 + type DIDInFlight struct { 26 + LiveCount int 27 + HistoricalCount int 22 28 } 23 29 24 30 type Outbox struct { ··· 30 36 webhookURL string 31 37 httpClient *http.Client 32 38 inFlightEvents map[uint]*InFlightEvent 33 - inFlightDIDs map[string]uint 39 + inFlightDIDs map[string]*DIDInFlight 34 40 inFlightMu sync.RWMutex 35 41 } 36 42 ··· 46 52 Timeout: 30 * time.Second, 47 53 }, 48 54 inFlightEvents: make(map[uint]*InFlightEvent), 49 - inFlightDIDs: make(map[string]uint), 55 + inFlightDIDs: make(map[string]*DIDInFlight), 50 56 } 51 57 } 52 58 53 59 func (o *Outbox) Run(ctx context.Context) { 54 - ticker := time.NewTicker(100 * time.Millisecond) 60 + ticker := time.NewTicker(1000 * time.Millisecond) 55 61 defer ticker.Stop() 56 62 57 63 // Start timeout checker for websocket-ack mode ··· 99 105 continue 100 106 } 101 107 102 - // Set the ID from the database record 103 108 outboxEvt.ID = evt.ID 104 109 did := outboxEvt.DID() 105 110 106 - // Check if this DID already has an in-flight event 111 + // check if events for the same DID are in flight and we should skip the event for now 112 + // it will remain in the DB & be tried again later 107 113 o.inFlightMu.RLock() 108 - _, hasInFlight := o.inFlightDIDs[did] 114 + counts, didExists := o.inFlightDIDs[did] 115 + _, evtExists := o.inFlightEvents[evt.ID] 109 116 o.inFlightMu.RUnlock() 110 117 111 - if hasInFlight { 112 - // Skip this event for now, will be retried later 118 + if evtExists { 113 119 continue 120 + } else if didExists { 121 + // Live events: wait for ALL in-flight events to clear (historical and live) 122 + if evt.Live && (counts.LiveCount > 0 || counts.HistoricalCount > 0) { 123 + continue 124 + } 125 + // Historical events: only wait for in-flight live events 126 + if !evt.Live && counts.LiveCount > 0 { 127 + continue 128 + } 114 129 } 115 130 116 - o.trackInFlight(&evt, &outboxEvt, did) 131 + o.trackInFlight(&evt, &outboxEvt, did, evt.Live) 117 132 118 133 // Deliver based on consumer mode 119 134 switch o.mode { ··· 127 142 } 128 143 } 129 144 130 - func (o *Outbox) trackInFlight(dbEvt *models.OutboxBuffer, outboxEvt *OutboxEvt, did string) { 145 + func (o *Outbox) trackInFlight(dbEvt *models.OutboxBuffer, outboxEvt *OutboxEvt, did string, isLive bool) { 131 146 o.inFlightMu.Lock() 132 147 o.inFlightEvents[dbEvt.ID] = &InFlightEvent{ 133 148 ID: dbEvt.ID, 134 149 DID: did, 150 + Live: isLive, 135 151 Event: outboxEvt, 136 152 SentAt: time.Now(), 137 153 } 138 - o.inFlightDIDs[did] = dbEvt.ID 154 + 155 + // Increment the appropriate counter 156 + counts, exists := o.inFlightDIDs[did] 157 + if !exists { 158 + counts = &DIDInFlight{} 159 + o.inFlightDIDs[did] = counts 160 + } 161 + if isLive { 162 + counts.LiveCount++ 163 + } else { 164 + counts.HistoricalCount++ 165 + } 139 166 o.inFlightMu.Unlock() 140 167 } 141 168 ··· 146 173 // Check if this was a tracked event (websocket-ack or webhook mode) 147 174 inFlight, exists := o.inFlightEvents[eventID] 148 175 if exists { 149 - // Remove from in-flight tracking 150 - delete(o.inFlightDIDs, inFlight.DID) 176 + // Decrement the appropriate counter 177 + did := inFlight.DID 178 + counts := o.inFlightDIDs[did] 179 + if inFlight.Live { 180 + counts.LiveCount-- 181 + } else { 182 + counts.HistoricalCount-- 183 + } 184 + 185 + // If no more events for this DID, remove the key 186 + if counts.LiveCount == 0 && counts.HistoricalCount == 0 { 187 + delete(o.inFlightDIDs, did) 188 + } 189 + 151 190 delete(o.inFlightEvents, eventID) 152 - o.logger.Info("event acked", "id", eventID, "did", inFlight.DID) 191 + o.logger.Info("event acked", "id", eventID, "did", did, "live", inFlight.Live) 153 192 } 154 193 155 194 // Delete from DB (for both tracked and untracked events) ··· 214 253 } 215 254 } 216 255 256 + // this just clears them out of tracking which will allow them to be retried on the next deliverPend 217 257 func (o *Outbox) retryTimedOutEvents() { 218 258 o.inFlightMu.Lock() 219 259 defer o.inFlightMu.Unlock() ··· 221 261 now := time.Now() 222 262 for id, inFlight := range o.inFlightEvents { 223 263 if now.Sub(inFlight.SentAt) > 10*time.Second { 224 - o.logger.Info("event timed out, resending", "id", id, "did", inFlight.DID) 264 + o.logger.Info("event timed out, resending", "id", id, "did", inFlight.DID, "live", inFlight.Live) 265 + 266 + // Decrement the appropriate counter 267 + did := inFlight.DID 268 + counts := o.inFlightDIDs[did] 269 + if inFlight.Live { 270 + counts.LiveCount-- 271 + } else { 272 + counts.HistoricalCount-- 273 + } 274 + 275 + // If no more events for this DID, remove the key 276 + if counts.LiveCount == 0 && counts.HistoricalCount == 0 { 277 + delete(o.inFlightDIDs, did) 278 + } 225 279 226 - // Remove from in-flight tracking so it can be retried 227 - delete(o.inFlightDIDs, inFlight.DID) 228 280 delete(o.inFlightEvents, id) 229 281 } 230 282 }