this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor outbox

dholms 86e2112b 10995378

+309 -147
+1 -1
nexus/nexus.go
··· 150 150 }() 151 151 } 152 152 153 - for i := 0; i < 20; i++ { 153 + for i := 0; i < 50; i++ { 154 154 go n.runResyncWorker(context.Background(), i) 155 155 } 156 156
+305 -136
nexus/outbox.go
··· 14 14 "gorm.io/gorm" 15 15 ) 16 16 17 - type InFlightEvent struct { 18 - ID uint 19 - DID string 20 - Live bool 21 - Event *OutboxEvt 22 - SentAt time.Time 23 - } 17 + // Ordering guarantees for events belonging to the same DID: 18 + // 19 + // Live events are synchronization barriers - all prior events must complete 20 + // before a live event can be sent, and the live event must complete before 21 + // any subsequent events can be sent. 22 + // 23 + // Historical events can be sent concurrently with each other (no ordering 24 + // between them), but cannot be sent while a live event is in-flight. 25 + // 26 + // Example sequence: H1, H2, L1, H3, H4, L2, H5 27 + // - H1 and H2 sent concurrently 28 + // - Wait for H1 and H2 to complete, then send L1 (alone) 29 + // - Wait for L1 to complete, then send H3 and H4 concurrently 30 + // - Wait for H3 and H4 to complete, then send L2 (alone) 31 + // - Wait for L2 to complete, then send H5 24 32 25 - type DIDInFlight struct { 26 - LiveCount int 27 - HistoricalCount int 33 + type DIDWorker struct { 34 + outbox *Outbox 35 + ctx context.Context 36 + did string 37 + notifChan chan struct{} 38 + pendingEvts []uint 39 + inFlightSentAt map[uint]time.Time 40 + blockedOnLive bool 41 + running bool 42 + mu sync.Mutex 28 43 } 29 44 30 45 type Outbox struct { 31 - db *gorm.DB 32 - logger *slog.Logger 33 - notify chan struct{} 34 - events chan *OutboxEvt 35 - mode OutboxMode 36 - webhookURL string 37 - httpClient *http.Client 38 - inFlightEvents map[uint]*InFlightEvent 39 - inFlightDIDs map[string]*DIDInFlight 40 - inFlightMu sync.RWMutex 46 + db *gorm.DB 47 + logger *slog.Logger 48 + events chan *OutboxEvt 49 + mode OutboxMode 50 + webhookURL string 51 + httpClient *http.Client 52 + 53 + eventCache map[uint]*OutboxEvt // ID -> event 54 + pendingIDs chan uint // Buffered channel of event IDs 55 + cacheMu sync.RWMutex 56 + lastLoadedID uint 57 + 58 + didWorkers map[string]*DIDWorker 59 + workersMu sync.Mutex 60 + 61 + ctx context.Context 41 62 } 42 63 43 64 func NewOutbox(db *gorm.DB, mode OutboxMode, webhookURL string) *Outbox { 44 65 return &Outbox{ 45 66 db: db, 46 67 logger: slog.Default().With("system", "outbox"), 47 - notify: make(chan struct{}), 48 - events: make(chan *OutboxEvt, 1000), 68 + events: make(chan *OutboxEvt, 10000), 49 69 mode: mode, 50 70 webhookURL: webhookURL, 51 71 httpClient: &http.Client{ 52 72 Timeout: 30 * time.Second, 53 73 }, 54 - inFlightEvents: make(map[uint]*InFlightEvent), 55 - inFlightDIDs: make(map[string]*DIDInFlight), 74 + eventCache: make(map[uint]*OutboxEvt), 75 + pendingIDs: make(chan uint, 1000000), 76 + didWorkers: make(map[string]*DIDWorker), 56 77 } 57 78 } 58 79 59 80 func (o *Outbox) Run(ctx context.Context) { 60 - ticker := time.NewTicker(1000 * time.Millisecond) 61 - defer ticker.Stop() 81 + o.ctx = ctx 62 82 63 - // Start timeout checker for websocket-ack mode 64 - // Webhook mode doesn't need this since sendWebhook() retries indefinitely 83 + // Webhook mode doesn't need timeout checker since sendWebhook() retries indefinitely 65 84 if o.mode == OutboxModeWebsocketAck { 66 85 go o.checkTimeouts(ctx) 67 86 } 87 + go o.runCacheLoader(ctx) 88 + go o.runDelivery(ctx) 89 + 90 + <-ctx.Done() 91 + } 92 + 93 + // continuously load events from DB into memory cache 94 + func (o *Outbox) runCacheLoader(ctx context.Context) { 95 + ticker := time.NewTicker(10 * time.Millisecond) 96 + defer ticker.Stop() 68 97 69 98 for { 70 99 select { 71 100 case <-ctx.Done(): 72 101 return 73 - case <-o.notify: 74 - o.deliverPending() 75 102 case <-ticker.C: 76 - o.deliverPending() 103 + o.loadMoreEvents() 77 104 } 78 105 } 79 106 } 80 107 81 - // notify outbox of new event in buffer 82 - // for active outboxes, channel will basically always be firing 83 - func (o *Outbox) Notify() { 84 - select { 85 - case o.notify <- struct{}{}: 86 - default: 87 - } 88 - } 108 + func (o *Outbox) loadMoreEvents() { 109 + o.cacheMu.RLock() 110 + lastID := o.lastLoadedID 111 + o.cacheMu.RUnlock() 112 + 113 + batchSize := 1000 89 114 90 - func (o *Outbox) deliverPending() { 91 115 var events []models.OutboxBuffer 92 - if err := o.db.Order("id ASC").Limit(100).Find(&events).Error; err != nil { 93 - o.logger.Error("failed to query outbox buffer", "error", err) 116 + if err := o.db.Where("id > ?", lastID). 117 + Order("id ASC"). 118 + Limit(batchSize). 119 + Find(&events).Error; err != nil { 120 + o.logger.Error("failed to load events into cache", "error", err) 94 121 return 95 122 } 96 123 ··· 98 125 return 99 126 } 100 127 101 - for _, evt := range events { 128 + o.cacheMu.Lock() 129 + for _, dbEvt := range events { 102 130 var outboxEvt OutboxEvt 103 - if err := json.Unmarshal([]byte(evt.Data), &outboxEvt); err != nil { 104 - o.logger.Error("failed to unmarshal outbox event", "error", err, "id", evt.ID) 131 + if err := json.Unmarshal([]byte(dbEvt.Data), &outboxEvt); err != nil { 132 + o.logger.Error("failed to unmarshal cached event", "error", err, "id", dbEvt.ID) 105 133 continue 106 134 } 107 135 108 - outboxEvt.ID = evt.ID 109 - did := outboxEvt.DID() 136 + outboxEvt.ID = dbEvt.ID 137 + o.eventCache[dbEvt.ID] = &outboxEvt 138 + 139 + select { 140 + case o.pendingIDs <- dbEvt.ID: 141 + default: 142 + // Channel full, will try again next time 143 + } 144 + 145 + if dbEvt.ID > o.lastLoadedID { 146 + o.lastLoadedID = dbEvt.ID 147 + } 148 + } 149 + o.cacheMu.Unlock() 150 + 151 + o.logger.Info("loaded events into cache", "count", len(events), "cacheSize", len(o.eventCache)) 152 + } 153 + 154 + // runDelivery continuously pulls from pendingIDs and delivers events 155 + func (o *Outbox) runDelivery(ctx context.Context) { 156 + for { 157 + select { 158 + case <-ctx.Done(): 159 + return 160 + case eventID := <-o.pendingIDs: 161 + o.deliverEvent(eventID) 162 + } 163 + } 164 + } 165 + 166 + func (o *Outbox) deliverEvent(eventID uint) { 167 + o.cacheMu.RLock() 168 + outboxEvt, exists := o.eventCache[eventID] 169 + o.cacheMu.RUnlock() 170 + 171 + if !exists { 172 + // Event was already acked/removed 173 + return 174 + } 175 + 176 + did := outboxEvt.DID() 177 + 178 + o.workersMu.Lock() 179 + worker, exists := o.didWorkers[did] 180 + if !exists { 181 + worker = &DIDWorker{ 182 + did: did, 183 + notifChan: make(chan struct{}, 1), 184 + inFlightSentAt: make(map[uint]time.Time), 185 + outbox: o, 186 + ctx: o.ctx, 187 + } 188 + o.didWorkers[did] = worker 189 + } 190 + o.workersMu.Unlock() 191 + 192 + worker.addEvent(outboxEvt) 193 + } 194 + 195 + func (o *Outbox) sendEvent(evt *OutboxEvt) { 196 + switch o.mode { 197 + case OutboxModeFireAndForget: 198 + o.events <- evt 199 + case OutboxModeWebsocketAck: 200 + o.events <- evt 201 + case OutboxModeWebhook: 202 + go o.sendWebhook(evt) 203 + } 204 + } 205 + 206 + func (w *DIDWorker) run() { 207 + for { 208 + select { 209 + case <-w.ctx.Done(): 210 + return 211 + case <-w.notifChan: 212 + } 213 + 214 + w.processPendingEvts() 215 + 216 + // Check if goroutine should exit 217 + w.mu.Lock() 218 + queueEmpty := len(w.pendingEvts) == 0 219 + noInFlight := len(w.inFlightSentAt) == 0 220 + 221 + if noInFlight { 222 + w.blockedOnLive = false 223 + } 224 + 225 + if queueEmpty && noInFlight { 226 + w.running = false 227 + w.mu.Unlock() 228 + return 229 + } 230 + w.mu.Unlock() 231 + } 232 + } 233 + 234 + // get as many pending events in flight as we can 235 + // returns when it hits a blocking event 236 + func (w *DIDWorker) processPendingEvts() { 237 + for { 238 + w.mu.Lock() 239 + if w.blockedOnLive { 240 + return // can't proceed, break out of send loop and wait for acks 241 + } 110 242 111 - // check if events for the same DID are in flight and we should skip the event for now 112 - // it will remain in the DB & be tried again later 113 - o.inFlightMu.RLock() 114 - counts, didExists := o.inFlightDIDs[did] 115 - _, evtExists := o.inFlightEvents[evt.ID] 116 - o.inFlightMu.RUnlock() 243 + if len(w.pendingEvts) == 0 { 244 + w.mu.Unlock() 245 + return 246 + } 247 + eventID := w.pendingEvts[0] 248 + w.mu.Unlock() 117 249 118 - if evtExists { 250 + w.outbox.cacheMu.RLock() 251 + outboxEvt, exists := w.outbox.eventCache[eventID] 252 + w.outbox.cacheMu.RUnlock() 253 + 254 + if !exists { 255 + // Event was already acked/removed, skip it 256 + w.mu.Lock() 257 + w.pendingEvts = w.pendingEvts[1:] 258 + w.mu.Unlock() 119 259 continue 120 - } else if didExists { 121 - // Live events: wait for ALL in-flight events to clear (historical and live) 122 - if evt.Live && (counts.LiveCount > 0 || counts.HistoricalCount > 0) { 123 - continue 124 - } 125 - // Historical events: only wait for in-flight live events 126 - if !evt.Live && counts.LiveCount > 0 { 127 - continue 128 - } 129 260 } 130 261 131 - o.trackInFlight(&evt, &outboxEvt, did, evt.Live) 262 + isLive := outboxEvt.RecordEvt != nil && outboxEvt.RecordEvt.Live 263 + 264 + w.mu.Lock() 265 + if isLive { 266 + hasInFlight := len(w.inFlightSentAt) > 0 267 + // live event - must wait for all in-flight to clear 268 + if hasInFlight { 269 + w.mu.Unlock() 270 + return 271 + } 132 272 133 - // Deliver based on consumer mode 134 - switch o.mode { 135 - case OutboxModeFireAndForget: 136 - o.events <- &outboxEvt 137 - case OutboxModeWebsocketAck: 138 - o.events <- &outboxEvt 139 - case OutboxModeWebhook: 140 - go o.sendWebhook(evt.ID, &outboxEvt) 273 + w.blockedOnLive = true 274 + } 275 + w.pendingEvts = w.pendingEvts[1:] 276 + w.inFlightSentAt[eventID] = time.Now() 277 + w.mu.Unlock() 278 + 279 + w.outbox.sendEvent(outboxEvt) 280 + if isLive { 281 + return // not going to be able to send anymore in this loop so return for now 141 282 } 142 283 } 143 284 } 144 285 145 - func (o *Outbox) trackInFlight(dbEvt *models.OutboxBuffer, outboxEvt *OutboxEvt, did string, isLive bool) { 146 - o.inFlightMu.Lock() 147 - o.inFlightEvents[dbEvt.ID] = &InFlightEvent{ 148 - ID: dbEvt.ID, 149 - DID: did, 150 - Live: isLive, 151 - Event: outboxEvt, 152 - SentAt: time.Now(), 286 + func (w *DIDWorker) addEvent(evt *OutboxEvt) { 287 + w.mu.Lock() 288 + 289 + hasInFlight := len(w.inFlightSentAt) > 0 290 + 291 + // Fast path: no contention, send immediately without goroutine 292 + if !hasInFlight { 293 + w.inFlightSentAt[evt.ID] = time.Now() 294 + w.mu.Unlock() 295 + w.outbox.sendEvent(evt) 296 + return 153 297 } 154 298 155 - // Increment the appropriate counter 156 - counts, exists := o.inFlightDIDs[did] 157 - if !exists { 158 - counts = &DIDInFlight{} 159 - o.inFlightDIDs[did] = counts 299 + // Slow path: contention exists, need goroutine for ordering 300 + w.pendingEvts = append(w.pendingEvts, evt.ID) 301 + if !w.running { 302 + w.running = true 303 + go w.run() 160 304 } 161 - if isLive { 162 - counts.LiveCount++ 163 - } else { 164 - counts.HistoricalCount++ 305 + w.mu.Unlock() 306 + 307 + select { 308 + case w.notifChan <- struct{}{}: 309 + default: 165 310 } 166 - o.inFlightMu.Unlock() 167 311 } 168 312 169 - func (o *Outbox) AckEvent(eventID uint) { 170 - o.inFlightMu.Lock() 171 - defer o.inFlightMu.Unlock() 313 + func (w *DIDWorker) ackEvent(evtID uint) { 314 + w.mu.Lock() 315 + defer w.mu.Unlock() 172 316 173 - // Check if this was a tracked event (websocket-ack or webhook mode) 174 - inFlight, exists := o.inFlightEvents[eventID] 175 - if exists { 176 - // Decrement the appropriate counter 177 - did := inFlight.DID 178 - counts := o.inFlightDIDs[did] 179 - if inFlight.Live { 180 - counts.LiveCount-- 181 - } else { 182 - counts.HistoricalCount-- 317 + delete(w.inFlightSentAt, evtID) 318 + 319 + select { 320 + case w.notifChan <- struct{}{}: 321 + default: 322 + } 323 + } 324 + 325 + // checkAndRetryTimeouts checks for timed out events and returns their IDs 326 + // Must be called without holding w.mu 327 + func (w *DIDWorker) timedOutEvents() []uint { 328 + w.mu.Lock() 329 + defer w.mu.Unlock() 330 + 331 + var timedOut []uint 332 + now := time.Now() 333 + 334 + for evtId, sentAt := range w.inFlightSentAt { 335 + if now.Sub(sentAt) > 10*time.Second { 336 + timedOut = append(timedOut, evtId) 183 337 } 338 + } 184 339 185 - // If no more events for this DID, remove the key 186 - if counts.LiveCount == 0 && counts.HistoricalCount == 0 { 187 - delete(o.inFlightDIDs, did) 340 + return timedOut 341 + } 342 + 343 + func (o *Outbox) AckEvent(eventID uint) { 344 + o.cacheMu.RLock() 345 + outboxEvt, exists := o.eventCache[eventID] 346 + o.cacheMu.RUnlock() 347 + 348 + if !exists { 349 + // evt not in cache, may have already been acked - still try to delete from DB 350 + if err := o.db.Delete(&models.OutboxBuffer{}, eventID).Error; err != nil { 351 + o.logger.Error("failed to delete acked event", "error", err, "id", eventID) 188 352 } 353 + return 354 + } 355 + 356 + did := outboxEvt.DID() 189 357 190 - delete(o.inFlightEvents, eventID) 191 - o.logger.Info("event acked", "id", eventID, "did", did, "live", inFlight.Live) 358 + o.workersMu.Lock() 359 + worker := o.didWorkers[did] 360 + o.workersMu.Unlock() 361 + 362 + if worker != nil { 363 + worker.ackEvent(eventID) 192 364 } 193 365 194 - // Delete from DB (for both tracked and untracked events) 366 + o.cacheMu.Lock() 367 + delete(o.eventCache, eventID) 368 + o.cacheMu.Unlock() 369 + 195 370 if err := o.db.Delete(&models.OutboxBuffer{}, eventID).Error; err != nil { 196 371 o.logger.Error("failed to delete acked event", "error", err, "id", eventID) 197 372 } 198 373 } 199 374 200 - func (o *Outbox) sendWebhook(eventID uint, evt *OutboxEvt) { 375 + func (o *Outbox) sendWebhook(evt *OutboxEvt) { 201 376 retries := 0 202 377 for { 203 378 if err := o.postWebhook(evt); err != nil { 204 - o.logger.Warn("webhook failed, retrying", "error", err, "id", eventID, "retries", retries) 379 + o.logger.Warn("webhook failed, retrying", "error", err, "id", evt.ID, "retries", retries) 205 380 time.Sleep(backoff(retries, 10)) 206 381 retries++ 207 382 continue 208 383 } 209 384 210 - // Success - ack the event 211 - o.AckEvent(eventID) 385 + o.AckEvent(evt.ID) 212 386 return 213 387 } 214 388 } ··· 240 414 } 241 415 242 416 func (o *Outbox) checkTimeouts(ctx context.Context) { 243 - ticker := time.NewTicker(1 * time.Second) 417 + ticker := time.NewTicker(10 * time.Second) 244 418 defer ticker.Stop() 245 419 246 420 for { ··· 253 427 } 254 428 } 255 429 256 - // this just clears them out of tracking which will allow them to be retried on the next deliverPend 430 + // retryTimedOutEvents iterates through all workers and re-queues timed out events 257 431 func (o *Outbox) retryTimedOutEvents() { 258 - o.inFlightMu.Lock() 259 - defer o.inFlightMu.Unlock() 432 + // Get snapshot of all active workers 433 + o.workersMu.Lock() 434 + workers := make([]*DIDWorker, 0, len(o.didWorkers)) 435 + for _, w := range o.didWorkers { 436 + workers = append(workers, w) 437 + } 438 + o.workersMu.Unlock() 260 439 261 - now := time.Now() 262 - for id, inFlight := range o.inFlightEvents { 263 - if now.Sub(inFlight.SentAt) > 10*time.Second { 264 - o.logger.Info("event timed out, resending", "id", id, "did", inFlight.DID, "live", inFlight.Live) 265 - 266 - // Decrement the appropriate counter 267 - did := inFlight.DID 268 - counts := o.inFlightDIDs[did] 269 - if inFlight.Live { 270 - counts.LiveCount-- 271 - } else { 272 - counts.HistoricalCount-- 440 + for _, worker := range workers { 441 + timedOutIDs := worker.timedOutEvents() 442 + for _, id := range timedOutIDs { 443 + o.cacheMu.RLock() 444 + evt, exists := o.eventCache[id] 445 + o.cacheMu.RUnlock() 446 + if exists { 447 + o.logger.Info("retrying timed out event", "id", id) 448 + o.sendEvent(evt) 273 449 } 274 - 275 - // If no more events for this DID, remove the key 276 - if counts.LiveCount == 0 && counts.HistoricalCount == 0 { 277 - delete(o.inFlightDIDs, did) 278 - } 279 - 280 - delete(o.inFlightEvents, id) 281 450 } 282 451 } 283 452 }
-4
nexus/processor.go
··· 111 111 return err 112 112 } 113 113 114 - ep.Outbox.Notify() 115 114 return nil 116 115 } 117 116 ··· 277 276 return err 278 277 } 279 278 280 - ep.Outbox.Notify() 281 279 return nil 282 280 } 283 281 ··· 344 342 } 345 343 } 346 344 347 - ep.Outbox.Notify() 348 345 return nil 349 346 } 350 347 ··· 398 395 return err 399 396 } 400 397 401 - ep.Outbox.Notify() 402 398 } 403 399 404 400 ep.Logger.Info("processed buffered resync events", "did", did, "count", len(bufferedEvts))
+3 -6
nexus/resync.go
··· 46 46 defer n.claimJobMu.Unlock() 47 47 48 48 var did string 49 - now := time.Now().Unix() 50 49 result := n.db.Raw(` 51 50 UPDATE repos 52 51 SET state = ? 53 52 WHERE did = ( 54 53 SELECT did FROM repos 55 54 WHERE state IN (?, ?) 56 - AND retry_after <= ? 57 - ORDER BY retry_after ASC 55 + ORDER BY RANDOM() 58 56 LIMIT 1 59 57 ) 60 58 RETURNING did 61 - `, models.RepoStateResyncing, models.RepoStatePending, models.RepoStateDesynced, now).Scan(&did) 59 + `, models.RepoStateResyncing, models.RepoStatePending, models.RepoStateDesynced).Scan(&did) 62 60 if result.Error != nil { 63 61 return "", false, result.Error 64 62 } ··· 269 267 return err 270 268 } 271 269 272 - n.outbox.Notify() 273 270 return nil 274 271 } 275 272 ··· 298 295 "state": state, 299 296 "error_msg": errMsg, 300 297 "retry_count": repo.RetryCount + 1, 301 - "retry_after": retryAfter.Unix(), 298 + "retry_after": retryAfter, 302 299 }).Error 303 300 if dbErr != nil { 304 301 return dbErr