this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

batch delete acked events

dholms 4407d314 0e8056fb

+149 -112
+149 -112
nexus/outbox.go
··· 58 58 didWorkers map[string]*DIDWorker 59 59 workersMu sync.Mutex 60 60 61 + ackQueue chan uint 62 + 61 63 ctx context.Context 62 64 } 63 65 ··· 74 76 eventCache: make(map[uint]*OutboxEvt), 75 77 pendingIDs: make(chan uint, 1000000), 76 78 didWorkers: make(map[string]*DIDWorker), 79 + ackQueue: make(chan uint, 100000), 77 80 } 78 81 } 79 82 80 83 func (o *Outbox) Run(ctx context.Context) { 81 84 o.ctx = ctx 82 85 83 - // Webhook mode doesn't need timeout checker since sendWebhook() retries indefinitely 84 86 if o.mode == OutboxModeWebsocketAck { 85 87 go o.checkTimeouts(ctx) 86 88 } 87 89 go o.runCacheLoader(ctx) 88 90 go o.runDelivery(ctx) 91 + go o.runBatchedDeletes(ctx) 89 92 90 93 <-ctx.Done() 91 94 } ··· 201 204 } 202 205 } 203 206 207 + func (o *Outbox) AckEvent(eventID uint) { 208 + o.cacheMu.RLock() 209 + outboxEvt, exists := o.eventCache[eventID] 210 + o.cacheMu.RUnlock() 211 + 212 + if exists { 213 + did := outboxEvt.DID() 214 + 215 + o.workersMu.Lock() 216 + worker := o.didWorkers[did] 217 + o.workersMu.Unlock() 218 + 219 + if worker != nil { 220 + worker.ackEvent(eventID) 221 + } 222 + 223 + o.cacheMu.Lock() 224 + delete(o.eventCache, eventID) 225 + o.cacheMu.Unlock() 226 + } 227 + 228 + select { 229 + case o.ackQueue <- eventID: 230 + default: 231 + o.logger.Warn("ack queue full, deleting synchronously", "id", eventID) 232 + if err := o.db.Delete(&models.OutboxBuffer{}, eventID).Error; err != nil { 233 + o.logger.Error("failed to delete acked event", "error", err, "id", eventID) 234 + } 235 + } 236 + } 237 + 238 + func (o *Outbox) sendWebhook(evt *OutboxEvt) { 239 + retries := 0 240 + for { 241 + if err := o.postWebhook(evt); err != nil { 242 + o.logger.Warn("webhook failed, retrying", "error", err, "id", evt.ID, "retries", retries) 243 + time.Sleep(backoff(retries, 10)) 244 + retries++ 245 + continue 246 + } 247 + 248 + o.AckEvent(evt.ID) 249 + return 250 + } 251 + } 252 + 253 + func (o *Outbox) postWebhook(evt *OutboxEvt) error { 254 + body, err := json.Marshal(evt) 255 + if err != nil { 256 + return fmt.Errorf("failed to marshal event: %w", err) 257 + } 258 + 259 + req, err := http.NewRequest("POST", o.webhookURL, bytes.NewReader(body)) 260 + if err != nil { 261 + return fmt.Errorf("failed to create request: %w", err) 262 + } 263 + 264 + req.Header.Set("Content-Type", "application/json") 265 + 266 + resp, err := o.httpClient.Do(req) 267 + if err != nil { 268 + return fmt.Errorf("failed to send request: %w", err) 269 + } 270 + defer resp.Body.Close() 271 + 272 + if resp.StatusCode < 200 || resp.StatusCode >= 300 { 273 + return fmt.Errorf("webhook returned non-2xx status: %d", resp.StatusCode) 274 + } 275 + 276 + return nil 277 + } 278 + 279 + func (o *Outbox) runBatchedDeletes(ctx context.Context) { 280 + ticker := time.NewTicker(time.Second) 281 + defer ticker.Stop() 282 + 283 + var batch []uint 284 + 285 + for { 286 + select { 287 + case <-ctx.Done(): 288 + return 289 + case id := <-o.ackQueue: 290 + batch = append(batch, id) 291 + if len(batch) >= 100 { 292 + o.flushDeleteBatch(batch) 293 + batch = nil 294 + } 295 + case <-ticker.C: 296 + if len(batch) > 0 { 297 + o.flushDeleteBatch(batch) 298 + batch = nil 299 + } 300 + } 301 + } 302 + } 303 + 304 + func (o *Outbox) flushDeleteBatch(ids []uint) { 305 + if len(ids) == 0 { 306 + return 307 + } 308 + 309 + if err := o.db.Delete(&models.OutboxBuffer{}, ids).Error; err != nil { 310 + o.logger.Error("failed to delete batch of acked events", "error", err, "count", len(ids)) 311 + } 312 + } 313 + 314 + func (o *Outbox) checkTimeouts(ctx context.Context) { 315 + ticker := time.NewTicker(10 * time.Second) 316 + defer ticker.Stop() 317 + 318 + for { 319 + select { 320 + case <-ctx.Done(): 321 + return 322 + case <-ticker.C: 323 + o.retryTimedOutEvents() 324 + } 325 + } 326 + } 327 + 328 + // retryTimedOutEvents iterates through all workers and re-queues timed out events 329 + func (o *Outbox) retryTimedOutEvents() { 330 + // Get snapshot of all active workers 331 + o.workersMu.Lock() 332 + workers := make([]*DIDWorker, 0, len(o.didWorkers)) 333 + for _, w := range o.didWorkers { 334 + workers = append(workers, w) 335 + } 336 + o.workersMu.Unlock() 337 + 338 + for _, worker := range workers { 339 + timedOutIDs := worker.timedOutEvents() 340 + for _, id := range timedOutIDs { 341 + o.cacheMu.RLock() 342 + evt, exists := o.eventCache[id] 343 + o.cacheMu.RUnlock() 344 + if exists { 345 + o.logger.Info("retrying timed out event", "id", id) 346 + o.sendEvent(evt) 347 + } 348 + } 349 + } 350 + } 351 + 204 352 func (w *DIDWorker) run() { 205 353 for { 206 354 select { ··· 337 485 338 486 return timedOut 339 487 } 340 - 341 - func (o *Outbox) AckEvent(eventID uint) { 342 - o.cacheMu.RLock() 343 - outboxEvt, exists := o.eventCache[eventID] 344 - o.cacheMu.RUnlock() 345 - 346 - if !exists { 347 - // evt not in cache, may have already been acked - still try to delete from DB 348 - if err := o.db.Delete(&models.OutboxBuffer{}, eventID).Error; err != nil { 349 - o.logger.Error("failed to delete acked event", "error", err, "id", eventID) 350 - } 351 - return 352 - } 353 - 354 - did := outboxEvt.DID() 355 - 356 - o.workersMu.Lock() 357 - worker := o.didWorkers[did] 358 - o.workersMu.Unlock() 359 - 360 - if worker != nil { 361 - worker.ackEvent(eventID) 362 - } 363 - 364 - o.cacheMu.Lock() 365 - delete(o.eventCache, eventID) 366 - o.cacheMu.Unlock() 367 - 368 - if err := o.db.Delete(&models.OutboxBuffer{}, eventID).Error; err != nil { 369 - o.logger.Error("failed to delete acked event", "error", err, "id", eventID) 370 - } 371 - } 372 - 373 - func (o *Outbox) sendWebhook(evt *OutboxEvt) { 374 - retries := 0 375 - for { 376 - if err := o.postWebhook(evt); err != nil { 377 - o.logger.Warn("webhook failed, retrying", "error", err, "id", evt.ID, "retries", retries) 378 - time.Sleep(backoff(retries, 10)) 379 - retries++ 380 - continue 381 - } 382 - 383 - o.AckEvent(evt.ID) 384 - return 385 - } 386 - } 387 - 388 - func (o *Outbox) postWebhook(evt *OutboxEvt) error { 389 - body, err := json.Marshal(evt) 390 - if err != nil { 391 - return fmt.Errorf("failed to marshal event: %w", err) 392 - } 393 - 394 - req, err := http.NewRequest("POST", o.webhookURL, bytes.NewReader(body)) 395 - if err != nil { 396 - return fmt.Errorf("failed to create request: %w", err) 397 - } 398 - 399 - req.Header.Set("Content-Type", "application/json") 400 - 401 - resp, err := o.httpClient.Do(req) 402 - if err != nil { 403 - return fmt.Errorf("failed to send request: %w", err) 404 - } 405 - defer resp.Body.Close() 406 - 407 - if resp.StatusCode < 200 || resp.StatusCode >= 300 { 408 - return fmt.Errorf("webhook returned non-2xx status: %d", resp.StatusCode) 409 - } 410 - 411 - return nil 412 - } 413 - 414 - func (o *Outbox) checkTimeouts(ctx context.Context) { 415 - ticker := time.NewTicker(10 * time.Second) 416 - defer ticker.Stop() 417 - 418 - for { 419 - select { 420 - case <-ctx.Done(): 421 - return 422 - case <-ticker.C: 423 - o.retryTimedOutEvents() 424 - } 425 - } 426 - } 427 - 428 - // retryTimedOutEvents iterates through all workers and re-queues timed out events 429 - func (o *Outbox) retryTimedOutEvents() { 430 - // Get snapshot of all active workers 431 - o.workersMu.Lock() 432 - workers := make([]*DIDWorker, 0, len(o.didWorkers)) 433 - for _, w := range o.didWorkers { 434 - workers = append(workers, w) 435 - } 436 - o.workersMu.Unlock() 437 - 438 - for _, worker := range workers { 439 - timedOutIDs := worker.timedOutEvents() 440 - for _, id := range timedOutIDs { 441 - o.cacheMu.RLock() 442 - evt, exists := o.eventCache[id] 443 - o.cacheMu.RUnlock() 444 - if exists { 445 - o.logger.Info("retrying timed out event", "id", id) 446 - o.sendEvent(evt) 447 - } 448 - } 449 - } 450 - }