this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

queue backfills

dholms 7657a100 309f20e3

+92 -54
+6 -12
nexus/backfill.go
··· 16 16 "github.com/ipfs/go-cid" 17 17 ) 18 18 19 - func (n *Nexus) backfillDid(ctx context.Context, did string) { 19 + func (n *Nexus) backfillDid(ctx context.Context, did string) error { 20 20 if err := n.UpdateRepoState(did, models.RepoStateBackfilling, "", ""); err != nil { 21 - n.logger.Error("failed to update state to backfilling", "error", err, "did", did) 22 - return 21 + return fmt.Errorf("failed to update state to backfilling: %w", err) 23 22 } 24 23 25 24 n.logger.Info("starting backfill", "did", did) 26 25 27 26 rev, err := n.backfillRepo(ctx, did) 28 27 if err != nil { 29 - n.logger.Error("backfill failed", "error", err, "did", did) 30 - if updateErr := n.UpdateRepoState(did, models.RepoStateError, "", err.Error()); updateErr != nil { 31 - n.logger.Error("failed to update state to error", "error", updateErr, "did", did) 32 - } 33 - return 28 + n.UpdateRepoState(did, models.RepoStateError, "", err.Error()) 29 + return err 34 30 } 35 31 36 32 if err := n.UpdateRepoState(did, models.RepoStateActive, rev, ""); err != nil { 37 - n.logger.Error("failed to update state to active", "error", err, "did", did) 38 - return 33 + return fmt.Errorf("failed to update state to active %w", err) 39 34 } 40 - 41 - n.logger.Info("backfill complete", "did", did, "rev", rev) 35 + return nil 42 36 } 43 37 44 38 func (n *Nexus) backfillRepo(ctx context.Context, did string) (string, error) {
+15 -33
nexus/handlers.go
··· 1 1 package main 2 2 3 3 import ( 4 - "context" 5 4 "net/http" 6 5 7 6 "github.com/bluesky-social/indigo/nexus/models" ··· 51 50 return err 52 51 } 53 52 54 - tx := n.db.Begin() 55 - defer tx.Rollback() 56 - 57 - var newDids []string 58 - 59 - for _, did := range payload.DIDs { 60 - var existing models.FilterDid 61 - err := tx.First(&existing, "did = ?", did).Error 62 - 63 - if err == nil { 64 - n.logger.Info("did already tracked", "did", did, "state", existing.State) 65 - continue 66 - } 67 - 68 - filterDid := &models.FilterDid{ 53 + filterDids := make([]models.FilterDid, len(payload.DIDs)) 54 + for i, did := range payload.DIDs { 55 + filterDids[i] = models.FilterDid{ 69 56 Did: did, 70 57 State: models.RepoStatePending, 71 58 } 72 - if err := tx.Create(filterDid).Error; err != nil { 73 - n.logger.Error("failed to insert did", "error", err, "did", did) 74 - return echo.NewHTTPError(http.StatusInternalServerError) 75 - } 59 + } 76 60 77 - n.mu.Lock() 78 - n.filterDids[did] = true 79 - n.mu.Unlock() 80 - 81 - newDids = append(newDids, did) 61 + if err := n.db.Save(&filterDids).Error; err != nil { 62 + n.logger.Error("failed to upsert dids", "error", err) 63 + return echo.NewHTTPError(http.StatusInternalServerError) 82 64 } 83 65 84 - if err := tx.Commit().Error; err != nil { 85 - n.logger.Error("failed to commit transaction", "error", err) 86 - return echo.NewHTTPError(http.StatusInternalServerError) 66 + n.mu.Lock() 67 + for _, did := range payload.DIDs { 68 + n.filterDids[did] = true 87 69 } 70 + n.mu.Unlock() 88 71 89 - for _, did := range newDids { 90 - go n.backfillDid(context.Background(), did) 72 + for _, did := range payload.DIDs { 73 + n.queueBackfill(did) 91 74 } 92 75 93 - n.logger.Info("added dids and started backfills", "new", len(newDids), "total", len(payload.DIDs)) 76 + n.logger.Info("added dids and queued backfills", "count", len(payload.DIDs)) 94 77 95 78 return c.JSON(http.StatusOK, map[string]interface{}{ 96 - "added": len(newDids), 97 - "total": len(payload.DIDs), 79 + "count": len(payload.DIDs), 98 80 }) 99 81 }
+25 -9
nexus/nexus.go
··· 21 21 filterDids map[string]bool // DID -> exists (for quick filtering) 22 22 mu sync.RWMutex 23 23 24 - // for signature verification 25 24 Dir identity.Directory 26 25 27 - outbox *Outbox 26 + outbox *Outbox 27 + backfillQueue *BackfillQueue 28 28 } 29 29 30 30 type Op struct { ··· 69 69 70 70 Dir: &cdir, 71 71 72 - outbox: NewOutbox(db), 72 + outbox: NewOutbox(db), 73 + backfillQueue: NewBackfillQueue(), 74 + } 75 + 76 + // run 50 backfill workers 77 + for i := 0; i < 50; i++ { 78 + go n.runBackfillWorker(i) 73 79 } 74 80 75 81 err = n.LoadFilters() ··· 117 123 n.filterDids[f.Did] = true 118 124 119 125 if f.State == models.RepoStatePending || f.State == models.RepoStateBackfilling { 120 - go n.backfillDid(context.Background(), f.Did) 126 + n.queueBackfill(f.Did) 121 127 } 122 128 } 123 129 124 130 return nil 125 131 } 126 132 127 - func (n *Nexus) GetRepoState(did string) (models.RepoState, error) { 128 - var filterDid models.FilterDid 129 - if err := n.db.First(&filterDid, "did = ?", did).Error; err != nil { 130 - return "", err 133 + func (n *Nexus) runBackfillWorker(workerID int) { 134 + logger := n.logger.With("worker", workerID) 135 + 136 + for { 137 + did := n.backfillQueue.Dequeue() 138 + logger.Info("processing backfill", "did", did) 139 + err := n.backfillDid(context.Background(), did) 140 + if err != nil { 141 + logger.Error("error backfilling did", "error", err, "did", did) 142 + } 131 143 } 132 - return filterDid.State, nil 144 + } 145 + 146 + func (n *Nexus) queueBackfill(did string) { 147 + depth := n.backfillQueue.Enqueue(did) 148 + n.logger.Info("queued backfill", "did", did, "queue_depth", depth) 133 149 } 134 150 135 151 func (n *Nexus) UpdateRepoState(did string, state models.RepoState, rev string, errorMsg string) error {
+46
nexus/queue.go
··· 1 + package main 2 + 3 + import "sync" 4 + 5 + type BackfillQueue struct { 6 + queue []string 7 + head int 8 + mu sync.Mutex 9 + cond *sync.Cond 10 + } 11 + 12 + func NewBackfillQueue() *BackfillQueue { 13 + q := &BackfillQueue{ 14 + queue: make([]string, 0), 15 + head: 0, 16 + } 17 + q.cond = sync.NewCond(&q.mu) 18 + return q 19 + } 20 + 21 + func (q *BackfillQueue) Enqueue(did string) int { 22 + q.mu.Lock() 23 + q.queue = append(q.queue, did) 24 + depth := len(q.queue) - q.head 25 + q.mu.Unlock() 26 + q.cond.Signal() 27 + return depth 28 + } 29 + 30 + func (q *BackfillQueue) Dequeue() string { 31 + q.mu.Lock() 32 + for q.head >= len(q.queue) { 33 + q.cond.Wait() 34 + } 35 + did := q.queue[q.head] 36 + q.head++ 37 + 38 + // trim queue every 10000 elements 39 + if q.head > 10000 { 40 + q.queue = append([]string(nil), q.queue[q.head:]...) 41 + q.head = 0 42 + } 43 + q.mu.Unlock() 44 + 45 + return did 46 + }