this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

move from backfill -> resync

dholms b8f76dfb 04b41044

+52 -40
+19 -19
nexus/backfill.go nexus/resync.go
··· 17 17 "gorm.io/gorm" 18 18 ) 19 19 20 - func (n *Nexus) runBackfillWorker(ctx context.Context, workerID int) { 20 + func (n *Nexus) runResyncWorker(ctx context.Context, workerID int) { 21 21 logger := n.logger.With("worker", workerID) 22 22 23 23 for { 24 - did, found, err := n.claimBackfillJob(ctx) 24 + did, found, err := n.claimResyncJob(ctx) 25 25 if err != nil { 26 - logger.Error("failed to claim backfill job", "error", err) 26 + logger.Error("failed to claim resync job", "error", err) 27 27 time.Sleep(3 * time.Second) 28 28 continue 29 29 } ··· 33 33 continue 34 34 } 35 35 36 - logger.Info("processing backfill", "did", did) 37 - err = n.backfillDid(ctx, did) 36 + logger.Info("processing resync", "did", did) 37 + err = n.resyncDid(ctx, did) 38 38 if err != nil { 39 - logger.Error("backfill failed", "did", did, "error", err) 39 + logger.Error("resync failed", "did", did, "error", err) 40 40 } 41 41 } 42 42 } 43 43 44 - func (n *Nexus) claimBackfillJob(ctx context.Context) (string, bool, error) { 44 + func (n *Nexus) claimResyncJob(ctx context.Context) (string, bool, error) { 45 45 var did models.Repo 46 46 err := n.db.Transaction(func(tx *gorm.DB) error { 47 - if err := tx.Where("state = ?", models.RepoStatePending). 47 + if err := tx.Where("state IN (?)", []models.RepoState{models.RepoStatePending, models.RepoStateDesynced}). 48 48 First(&did).Error; err != nil { 49 49 return err 50 50 } 51 51 52 52 return tx.Model(&models.Repo{}). 53 53 Where("did = ?", did.Did). 54 - Update("state", models.RepoStateBackfilling).Error 54 + Update("state", models.RepoStateResyncing).Error 55 55 }) 56 56 57 57 if err != nil { ··· 64 64 return did.Did, true, nil 65 65 } 66 66 67 - func (n *Nexus) backfillDid(ctx context.Context, did string) error { 68 - n.logger.Info("starting backfill", "did", did) 67 + func (n *Nexus) resyncDid(ctx context.Context, did string) error { 68 + n.logger.Info("starting resync", "did", did) 69 69 70 - err := n.doBackfill(ctx, did) 70 + err := n.doResync(ctx, did) 71 71 if err != nil { 72 72 n.db.Model(&models.Repo{}). 73 73 Where("did = ?", did). ··· 80 80 return err 81 81 } 82 82 83 - if err := n.EventProcessor.drainBackfillBuffer(ctx, did); err != nil { 84 - n.logger.Error("failed to drain backfill buffer events", "did", did, "error", err) 83 + if err := n.EventProcessor.drainResyncBuffer(ctx, did); err != nil { 84 + n.logger.Error("failed to drain resync buffer events", "did", did, "error", err) 85 85 } 86 86 87 87 return nil 88 88 } 89 89 90 - func (n *Nexus) doBackfill(ctx context.Context, did string) error { 90 + func (n *Nexus) doResync(ctx context.Context, did string) error { 91 91 ident, err := n.Dir.LookupDID(ctx, syntax.DID(did)) 92 92 if err != nil { 93 93 return fmt.Errorf("failed to resolve DID: %w", err) ··· 210 210 return fmt.Errorf("failed to update repo state to active %w", err) 211 211 } 212 212 213 - n.logger.Info("backfill repo complete", "did", did, "records", numRecords, "rev", rev) 213 + n.logger.Info("resync repo complete", "did", did, "records", numRecords, "rev", rev) 214 214 return nil 215 215 } 216 216 217 - func (n *Nexus) resetBackfillingToPending() error { 217 + func (n *Nexus) resetPartiallyResynced() error { 218 218 return n.db.Model(&models.Repo{}). 219 - Where("state = ?", models.RepoStateBackfilling). 220 - Update("state", models.RepoStatePending).Error 219 + Where("state = ?", models.RepoStateResyncing). 220 + Update("state", models.RepoStateDesynced).Error 221 221 }
+6 -5
nexus/models/models.go
··· 3 3 type RepoState string 4 4 5 5 const ( 6 - RepoStatePending RepoState = "pending" 7 - RepoStateBackfilling RepoState = "backfilling" 8 - RepoStateActive RepoState = "active" 9 - RepoStateError RepoState = "error" 6 + RepoStatePending RepoState = "pending" 7 + RepoStateResyncing RepoState = "resyncing" 8 + RepoStateActive RepoState = "active" 9 + RepoStateDesynced RepoState = "desynced" 10 + RepoStateError RepoState = "error" 10 11 ) 11 12 12 13 type Repo struct { ··· 22 23 Data string `gorm:"type:text;not null"` // JSON-encoded operations 23 24 } 24 25 25 - type BackfillBuffer struct { 26 + type ResyncBuffer struct { 26 27 ID uint `gorm:"primaryKey"` 27 28 Did string `gorm:"not null;index"` 28 29 Data string `gorm:"type:text;not null"` // JSON-encoded Commit
+5 -5
nexus/nexus.go
··· 43 43 return nil, err 44 44 } 45 45 46 - if err := db.AutoMigrate(&models.Repo{}, &models.RepoRecord{}, &models.OutboxBuffer{}, &models.BackfillBuffer{}, &models.Cursor{}); err != nil { 46 + if err := db.AutoMigrate(&models.Repo{}, &models.RepoRecord{}, &models.OutboxBuffer{}, &models.ResyncBuffer{}, &models.Cursor{}); err != nil { 47 47 return nil, err 48 48 } 49 49 ··· 104 104 Callbacks: rsc, 105 105 } 106 106 107 - // crash recovery: reset any backfilling repos to pending on service startup 108 - if err := n.resetBackfillingToPending(); err != nil { 107 + // crash recovery: reset any partially repos 108 + if err := n.resetPartiallyResynced(); err != nil { 109 109 return nil, err 110 110 } 111 111 112 - for i := 0; i < 50; i++ { 113 - go n.runBackfillWorker(context.Background(), i) 112 + for i := 0; i < 20; i++ { 113 + go n.runResyncWorker(context.Background(), i) 114 114 } 115 115 116 116 go n.EventProcessor.RunCursorSaver(context.Background(), cursorSaveInterval)
+22 -11
nexus/processor.go
··· 39 39 return nil 40 40 } 41 41 42 - if d.State == models.RepoStatePending { 42 + if d.State != models.RepoStateActive && d.State != models.RepoStateResyncing { 43 43 return nil 44 44 } 45 45 ··· 51 51 if evt.PrevData == nil { 52 52 ep.Logger.Debug("legacy commit event, skipping prev data check", "did", evt.Repo, "rev", evt.Rev) 53 53 } else if evt.PrevData.String() != d.PrevData { 54 - // @TODO DESYNCED 55 54 ep.Logger.Warn("repo state desynced", "did", evt.Repo, "rev", evt.Rev) 55 + // gets picked up by resync workers 56 + if err := ep.UpdateRepoState(evt.Repo, models.RepoStateDesynced); err != nil { 57 + ep.Logger.Error("failed to update repo state to desynced", "did", evt.Repo, "error", err) 58 + return err 59 + } 60 + return nil 56 61 } 57 62 58 63 commit, err := ep.validateCommit(ctx, evt) ··· 61 66 return err 62 67 } 63 68 64 - if d.State == models.RepoStateBackfilling { 65 - if err := ep.addToBackfillBuffer(commit); err != nil { 69 + if d.State == models.RepoStateResyncing { 70 + if err := ep.addToResyncBuffer(commit); err != nil { 66 71 ep.Logger.Error("failed to buffer commit", "did", evt.Repo, "error", err) 67 72 return err 68 73 } ··· 178 183 }) 179 184 } 180 185 181 - func (ep *EventProcessor) addToBackfillBuffer(commit *Commit) error { 186 + func (ep *EventProcessor) addToResyncBuffer(commit *Commit) error { 182 187 jsonData, err := json.Marshal(commit) 183 188 if err != nil { 184 189 return err 185 190 } 186 - return ep.DB.Create(&models.BackfillBuffer{ 191 + return ep.DB.Create(&models.ResyncBuffer{ 187 192 Did: commit.Did, 188 193 Data: string(jsonData), 189 194 }).Error 190 195 } 191 196 192 - func (ep *EventProcessor) drainBackfillBuffer(ctx context.Context, did string) error { 193 - var bufferedEvts []models.BackfillBuffer 197 + func (ep *EventProcessor) drainResyncBuffer(ctx context.Context, did string) error { 198 + var bufferedEvts []models.ResyncBuffer 194 199 if err := ep.DB.Where("did = ?", did).Order("id ASC").Find(&bufferedEvts).Error; err != nil { 195 200 return fmt.Errorf("failed to load buffered events: %w", err) 196 201 } ··· 199 204 return nil 200 205 } 201 206 202 - ep.Logger.Info("processing buffered backfill events", "did", did, "count", len(bufferedEvts)) 207 + ep.Logger.Info("processing buffered resync events", "did", did, "count", len(bufferedEvts)) 203 208 204 209 for _, evt := range bufferedEvts { 205 210 var commit Commit ··· 219 224 return err 220 225 } 221 226 222 - if err := ep.DB.Delete(&models.BackfillBuffer{}, "id = ?", evt.ID).Error; err != nil { 227 + if err := ep.DB.Delete(&models.ResyncBuffer{}, "id = ?", evt.ID).Error; err != nil { 223 228 ep.Logger.Error("failed to delete buffered event", "id", evt.ID, "did", commit.Did, "rev", commit.Rev, "error", err) 224 229 return err 225 230 } 226 231 } 227 232 228 - ep.Logger.Info("processed buffered backfill events", "did", did, "count", len(bufferedEvts)) 233 + ep.Logger.Info("processed buffered resync events", "did", did, "count", len(bufferedEvts)) 229 234 return nil 230 235 } 231 236 ··· 280 285 } 281 286 return cursor.Cursor, nil 282 287 } 288 + 289 + func (ep *EventProcessor) UpdateRepoState(did string, state models.RepoState) error { 290 + return ep.DB.Model(&models.Repo{}). 291 + Where("did = ?", did). 292 + Update("state", state).Error 293 + }