this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor evt cache

dholms ed34dd65 cb9d4eb1

+104 -84
+104 -84
cmd/nexus/outbox.go
··· 50 50 webhookURL string 51 51 httpClient *http.Client 52 52 53 - eventCache map[uint]*OutboxEvt // ID -> event 54 - pendingIDs chan uint // Buffered channel of event IDs 55 - cacheMu sync.RWMutex 56 - lastLoadedID uint 53 + cache *EventCache 57 54 58 55 didWorkers map[string]*DIDWorker 59 56 workersMu sync.Mutex ··· 64 61 } 65 62 66 63 func NewOutbox(db *gorm.DB, mode OutboxMode, webhookURL string) *Outbox { 64 + logger := slog.Default().With("system", "outbox") 67 65 return &Outbox{ 68 66 db: db, 69 67 logger: slog.Default().With("system", "outbox"), ··· 73 71 httpClient: &http.Client{ 74 72 Timeout: 30 * time.Second, 75 73 }, 76 - eventCache: make(map[uint]*OutboxEvt), 77 - pendingIDs: make(chan uint, 1000000), 74 + cache: NewEventCache(db, logger, 1000, 1000000), 78 75 didWorkers: make(map[string]*DIDWorker), 79 76 ackQueue: make(chan uint, 100000), 80 77 } ··· 87 84 if o.mode == OutboxModeWebsocketAck { 88 85 go o.checkTimeouts(ctx) 89 86 } 90 - go o.runCacheLoader(ctx) 87 + go o.cache.run(ctx) 91 88 go o.runDelivery(ctx) 92 89 go o.runBatchedDeletes(ctx) 93 90 94 91 <-ctx.Done() 95 92 } 96 93 97 - // continuously load events from DB into memory cache 98 - func (o *Outbox) runCacheLoader(ctx context.Context) { 99 - for { 100 - select { 101 - case <-ctx.Done(): 102 - return 103 - default: 104 - loaded := o.loadMoreEvents() 105 - if !loaded { 106 - time.Sleep(500 * time.Millisecond) 107 - } 108 - } 109 - } 110 - } 111 - 112 - func (o *Outbox) loadMoreEvents() bool { 113 - o.cacheMu.RLock() 114 - lastID := o.lastLoadedID 115 - o.cacheMu.RUnlock() 116 - 117 - batchSize := 1000 118 - 119 - var events []models.OutboxBuffer 120 - if err := o.db.Where("id > ?", lastID). 121 - Order("id ASC"). 122 - Limit(batchSize). 123 - Find(&events).Error; err != nil { 124 - o.logger.Error("failed to load events into cache", "error", err, "lastID", lastID) 125 - return false 126 - } 127 - 128 - if len(events) == 0 { 129 - return false 130 - } 131 - 132 - o.cacheMu.Lock() 133 - for _, dbEvt := range events { 134 - var outboxEvt OutboxEvt 135 - if err := json.Unmarshal([]byte(dbEvt.Data), &outboxEvt); err != nil { 136 - o.logger.Error("failed to unmarshal cached event", "error", err, "id", dbEvt.ID) 137 - continue 138 - } 139 - 140 - outboxEvt.ID = dbEvt.ID 141 - o.eventCache[dbEvt.ID] = &outboxEvt 142 - 143 - if dbEvt.ID > o.lastLoadedID { 144 - o.lastLoadedID = dbEvt.ID 145 - } 146 - } 147 - o.cacheMu.Unlock() 148 - 149 - // do outside of cacheMu lock to avoid holding the lcok in the case of back pressure 150 - for _, dbEvt := range events { 151 - // back pressure if pendingIDs channel is full 152 - o.pendingIDs <- dbEvt.ID 153 - } 154 - return true 155 - } 156 - 157 94 // runDelivery continuously pulls from pendingIDs and delivers events 158 95 func (o *Outbox) runDelivery(ctx context.Context) { 159 96 for { 160 97 select { 161 98 case <-ctx.Done(): 162 99 return 163 - case eventID := <-o.pendingIDs: 100 + case eventID := <-o.cache.pendingIDs: 164 101 o.deliverEvent(eventID) 165 102 } 166 103 } 167 104 } 168 105 169 106 func (o *Outbox) deliverEvent(eventID uint) { 170 - o.cacheMu.RLock() 171 - outboxEvt, exists := o.eventCache[eventID] 172 - o.cacheMu.RUnlock() 173 - 107 + outboxEvt, exists := o.cache.GetEvent(eventID) 174 108 if !exists { 175 109 // Event was already acked/removed 176 110 return ··· 208 142 209 143 // AckEvent marks an event as delivered and queues it for deletion. 210 144 func (o *Outbox) AckEvent(eventID uint) { 211 - o.cacheMu.RLock() 212 - outboxEvt, exists := o.eventCache[eventID] 213 - o.cacheMu.RUnlock() 145 + outboxEvt, exists := o.cache.GetEvent(eventID) 214 146 215 147 if exists { 216 148 did := outboxEvt.DID() ··· 223 155 worker.ackEvent(eventID) 224 156 } 225 157 226 - o.cacheMu.Lock() 227 - delete(o.eventCache, eventID) 228 - o.cacheMu.Unlock() 158 + o.cache.DeleteEvent(eventID) 229 159 } 230 160 231 161 select { ··· 342 272 for _, worker := range workers { 343 273 timedOutIDs := worker.timedOutEvents() 344 274 for _, id := range timedOutIDs { 345 - o.cacheMu.RLock() 346 - evt, exists := o.eventCache[id] 347 - o.cacheMu.RUnlock() 275 + evt, exists := o.cache.GetEvent(id) 348 276 if exists { 349 277 o.logger.Info("retrying timed out event", "id", id) 350 278 o.sendEvent(evt) ··· 404 332 eventID := w.pendingEvts[0] 405 333 w.mu.Unlock() 406 334 407 - w.outbox.cacheMu.RLock() 408 - outboxEvt, exists := w.outbox.eventCache[eventID] 409 - w.outbox.cacheMu.RUnlock() 335 + outboxEvt, exists := w.outbox.cache.GetEvent(eventID) 410 336 411 337 if !exists { 412 338 // Event was already acked/removed, skip it ··· 496 422 497 423 return timedOut 498 424 } 425 + 426 + type EventCache struct { 427 + db *gorm.DB 428 + logger *slog.Logger 429 + 430 + batchSize int 431 + 432 + eventCache map[uint]*OutboxEvt 433 + cacheMu sync.RWMutex 434 + 435 + pendingIDs chan uint 436 + dbEvtChan chan *models.OutboxBuffer // internal channel 437 + } 438 + 439 + func NewEventCache(db *gorm.DB, logger *slog.Logger, batchSize int, pendingSize int) *EventCache { 440 + return &EventCache{ 441 + db: db, 442 + logger: logger, 443 + batchSize: batchSize, 444 + eventCache: make(map[uint]*OutboxEvt), 445 + pendingIDs: make(chan uint, pendingSize), 446 + dbEvtChan: make(chan *models.OutboxBuffer, batchSize*2), 447 + } 448 + } 449 + 450 + func (ec *EventCache) run(ctx context.Context) { 451 + go ec.loadEvents(ctx) 452 + go ec.processEvents(ctx) 453 + } 454 + 455 + func (ec *EventCache) loadEvents(ctx context.Context) { 456 + var lastID uint 457 + for { 458 + select { 459 + case <-ctx.Done(): 460 + return 461 + default: 462 + var events []models.OutboxBuffer 463 + if err := ec.db.Where("id > ?", lastID). 464 + Order("id ASC"). 465 + Limit(ec.batchSize). 466 + Find(&events).Error; err != nil { 467 + ec.logger.Error("failed to load events into cache", "error", err, "lastID", lastID) 468 + time.Sleep(500 * time.Millisecond) 469 + continue 470 + } 471 + if len(events) == 0 { 472 + time.Sleep(500 * time.Millisecond) 473 + continue 474 + } 475 + for _, evt := range events { 476 + ec.dbEvtChan <- &evt 477 + } 478 + lastID = events[len(events)-1].ID 479 + } 480 + } 481 + } 482 + 483 + func (ec *EventCache) processEvents(ctx context.Context) { 484 + for { 485 + select { 486 + case <-ctx.Done(): 487 + return 488 + case evt := <-ec.dbEvtChan: 489 + var outboxEvt OutboxEvt 490 + if err := json.Unmarshal([]byte(evt.Data), &outboxEvt); err != nil { 491 + ec.logger.Error("failed to unmarshal cached event", "error", err, "id", evt.ID) 492 + continue 493 + } 494 + 495 + outboxEvt.ID = evt.ID 496 + 497 + ec.cacheMu.Lock() 498 + ec.eventCache[evt.ID] = &outboxEvt 499 + ec.cacheMu.Unlock() 500 + 501 + // back pressure if pendingIDs channel is full 502 + ec.pendingIDs <- evt.ID 503 + } 504 + } 505 + } 506 + 507 + func (ec *EventCache) GetEvent(id uint) (*OutboxEvt, bool) { 508 + ec.cacheMu.RLock() 509 + defer ec.cacheMu.RUnlock() 510 + evt, exists := ec.eventCache[id] 511 + return evt, exists 512 + } 513 + 514 + func (ec *EventCache) DeleteEvent(id uint) { 515 + ec.cacheMu.Lock() 516 + defer ec.cacheMu.Unlock() 517 + delete(ec.eventCache, id) 518 + }