this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

trying to speed up outbox

dholms 61bf02fe ed34dd65

+38 -33
+36 -31
cmd/nexus/outbox.go
··· 52 52 53 53 cache *EventCache 54 54 55 - didWorkers map[string]*DIDWorker 56 - workersMu sync.Mutex 55 + didWorkers sync.Map // map[string]*DIDWorker 57 56 58 57 ackQueue chan uint 59 58 ··· 71 70 httpClient: &http.Client{ 72 71 Timeout: 30 * time.Second, 73 72 }, 74 - cache: NewEventCache(db, logger, 1000, 1000000), 75 - didWorkers: make(map[string]*DIDWorker), 76 - ackQueue: make(chan uint, 100000), 73 + cache: NewEventCache(db, logger, 1000, 1000000), 74 + ackQueue: make(chan uint, 100000), 77 75 } 78 76 } 79 77 ··· 85 83 go o.checkTimeouts(ctx) 86 84 } 87 85 go o.cache.run(ctx) 88 - go o.runDelivery(ctx) 86 + 87 + // Run multiple delivery workers for parallelism across DIDs 88 + for i := 0; i < 20; i++ { 89 + go o.runDelivery(ctx) 90 + } 91 + 89 92 go o.runBatchedDeletes(ctx) 90 93 91 94 <-ctx.Done() ··· 112 115 113 116 did := outboxEvt.DID() 114 117 115 - o.workersMu.Lock() 116 - worker, exists := o.didWorkers[did] 117 - if !exists { 118 - worker = &DIDWorker{ 119 - did: did, 120 - notifChan: make(chan struct{}, 1), 121 - inFlightSentAt: make(map[uint]time.Time), 122 - outbox: o, 123 - ctx: o.ctx, 124 - } 125 - o.didWorkers[did] = worker 118 + // Fast path: try to load existing worker 119 + if val, ok := o.didWorkers.Load(did); ok { 120 + worker := val.(*DIDWorker) 121 + worker.addEvent(outboxEvt) 122 + return 126 123 } 127 - o.workersMu.Unlock() 128 124 129 - worker.addEvent(outboxEvt) 125 + // Slow path: create new worker 126 + worker := &DIDWorker{ 127 + did: did, 128 + notifChan: make(chan struct{}, 1), 129 + inFlightSentAt: make(map[uint]time.Time), 130 + outbox: o, 131 + ctx: o.ctx, 132 + } 133 + actual, _ := o.didWorkers.LoadOrStore(did, worker) 134 + actual.(*DIDWorker).addEvent(outboxEvt) 130 135 } 131 136 132 137 func (o *Outbox) sendEvent(evt *OutboxEvt) { ··· 147 152 if exists { 148 153 did := outboxEvt.DID() 149 154 150 - o.workersMu.Lock() 151 - worker := o.didWorkers[did] 152 - o.workersMu.Unlock() 153 - 154 - if worker != nil { 155 + if val, ok := o.didWorkers.Load(did); ok { 156 + worker := val.(*DIDWorker) 155 157 worker.ackEvent(eventID) 156 158 } 157 159 ··· 262 264 // retryTimedOutEvents iterates through all workers and re-queues timed out events 263 265 func (o *Outbox) retryTimedOutEvents() { 264 266 // Get snapshot of all active workers 265 - o.workersMu.Lock() 266 - workers := make([]*DIDWorker, 0, len(o.didWorkers)) 267 - for _, w := range o.didWorkers { 268 - workers = append(workers, w) 269 - } 270 - o.workersMu.Unlock() 267 + workers := make([]*DIDWorker, 0) 268 + o.didWorkers.Range(func(key, value interface{}) bool { 269 + workers = append(workers, value.(*DIDWorker)) 270 + return true 271 + }) 271 272 272 273 for _, worker := range workers { 273 274 timedOutIDs := worker.timedOutEvents() ··· 449 450 450 451 func (ec *EventCache) run(ctx context.Context) { 451 452 go ec.loadEvents(ctx) 452 - go ec.processEvents(ctx) 453 + 454 + // Run multiple processors for parallel JSON unmarshaling 455 + for i := 0; i < 4; i++ { 456 + go ec.processEvents(ctx) 457 + } 453 458 } 454 459 455 460 func (ec *EventCache) loadEvents(ctx context.Context) {
+2 -2
cmd/nexus/server.go
··· 73 73 74 74 // Process acks directly in websocket-ack mode 75 75 if ns.Outbox.mode == OutboxModeWebsocketAck { 76 - ns.Outbox.AckEvent(msg.ID) 76 + go ns.Outbox.AckEvent(msg.ID) 77 77 } 78 78 } 79 79 }() ··· 94 94 // In fire-and-forget mode, ack immediately after write succeeds 95 95 // In websocket-ack mode, wait for client to send ack and handle in read loop 96 96 if ns.Outbox.mode == OutboxModeFireAndForget { 97 - ns.Outbox.AckEvent(evt.ID) 97 + go ns.Outbox.AckEvent(evt.ID) 98 98 } 99 99 } 100 100 }