this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

buffer live events while backfilling

dholms edb89a68 424da82b

+99 -1
+50
nexus/backfill.go
··· 32 32 if err := n.UpdateRepoState(did, models.RepoStateActive, rev, ""); err != nil { 33 33 return fmt.Errorf("failed to update state to active %w", err) 34 34 } 35 + 36 + if err := n.processBufferedEvents(ctx, did); err != nil { 37 + n.logger.Error("failed to process buffered events", "did", did, "error", err) 38 + } 39 + 35 40 return nil 36 41 } 37 42 ··· 157 162 n.logger.Info("backfill repo complete", "did", did, "records", numRecords, "rev", rev) 158 163 return rev, nil 159 164 } 165 + 166 + func (n *Nexus) processBufferedEvents(ctx context.Context, did string) error { 167 + var bufferedEvts []models.BackfillBuffer 168 + if err := n.db.Where("did = ?", did).Order("id ASC").Find(&bufferedEvts).Error; err != nil { 169 + return fmt.Errorf("failed to load buffered events: %w", err) 170 + } 171 + 172 + if len(bufferedEvts) == 0 { 173 + return nil 174 + } 175 + 176 + n.logger.Info("processing buffered backfill events", "did", did, "count", len(bufferedEvts)) 177 + 178 + for _, evt := range bufferedEvts { 179 + op := &Op{ 180 + Did: evt.Did, 181 + Collection: evt.Collection, 182 + Rkey: evt.Rkey, 183 + Action: evt.Action, 184 + Cid: evt.Cid, 185 + } 186 + 187 + if err := n.outbox.Send(op); err != nil { 188 + return fmt.Errorf("failed to send buffered event: %w", err) 189 + } 190 + 191 + repoRecord := models.RepoRecord{ 192 + Did: evt.Did, 193 + Collection: evt.Collection, 194 + Rkey: evt.Rkey, 195 + Cid: evt.Cid, 196 + Rev: evt.Rev, 197 + } 198 + if err := n.db.Save(&repoRecord).Error; err != nil { 199 + n.logger.Error("failed to save repo record from buffered event", "did", evt.Did, "error", err) 200 + } 201 + } 202 + 203 + if err := n.db.Where("did = ?", did).Delete(&models.BackfillBuffer{}).Error; err != nil { 204 + n.logger.Error("failed to delete buffered backfill events", "did", did, "error", err) 205 + } 206 + 207 + n.logger.Info("processed buffered backfill events", "did", did, "count", len(bufferedEvts)) 208 + return nil 209 + }
+28
nexus/firehose.go
··· 67 67 return nil 68 68 } 69 69 70 + state, err := n.GetRepoState(evt.Repo) 71 + if err != nil { 72 + n.logger.Error("failed to get repo state", "did", evt.Repo, "error", err) 73 + return nil 74 + } 75 + 76 + if state == models.RepoStatePending { 77 + return nil 78 + } else if state == models.RepoStateBackfilling { 79 + return n.bufferCommitEvent(evt) 80 + } 81 + 70 82 r, err := repo.VerifyCommitMessage(ctx, evt) 71 83 if err != nil { 72 84 n.logger.Info("failed to verify commit", "did", evt.Repo, "error", err) ··· 142 154 143 155 return nil 144 156 } 157 + 158 + func (n *Nexus) bufferCommitEvent(evt *comatproto.SyncSubscribeRepos_Commit) error { 159 + for _, op := range evt.Ops { 160 + bufferedEvt := models.BufferedEvt{ 161 + Did: evt.Repo, 162 + Collection: op.Path[:len(op.Path)-len(op.Path[len(op.Path)-1:])], // extract collection from path 163 + Rkey: op.Path[len(op.Path)-1:], // extract rkey from path 164 + Action: op.Action, 165 + Cid: op.Cid.String(), 166 + } 167 + if err := n.db.Create(&bufferedEvt).Error; err != nil { 168 + n.logger.Error("failed to buffer event", "did", evt.Repo, "path", op.Path, "error", err) 169 + } 170 + } 171 + return nil 172 + }
+12
nexus/models/models.go
··· 30 30 Record string `gorm:"type:text"` 31 31 } 32 32 33 + type BackfillBuffer struct { 34 + ID uint `gorm:"primaryKey"` 35 + Did string `gorm:"not null;index"` 36 + Collection string `gorm:"not null"` 37 + Rkey string `gorm:"not null"` 38 + Action string `gorm:"not null"` 39 + Cid string `gorm:"type:text"` 40 + Record string `gorm:"type:text"` 41 + Rev string `gorm:"not null"` 42 + CreatedAt time.Time `gorm:"not null"` 43 + } 44 + 33 45 type RepoRecord struct { 34 46 Did string `gorm:"primaryKey"` 35 47 Collection string `gorm:"primaryKey"`
+9 -1
nexus/nexus.go
··· 43 43 return nil, err 44 44 } 45 45 46 - if err := db.AutoMigrate(&models.BufferedEvt{}, &models.FilterDid{}, &models.RepoRecord{}); err != nil { 46 + if err := db.AutoMigrate(&models.BufferedEvt{}, &models.FilterDid{}, &models.RepoRecord{}, &models.BackfillBuffer{}); err != nil { 47 47 return nil, err 48 48 } 49 49 ··· 144 144 func (n *Nexus) queueBackfill(did string) { 145 145 depth := n.backfillQueue.Enqueue(did) 146 146 n.logger.Info("queued backfill", "did", did, "queue_depth", depth) 147 + } 148 + 149 + func (n *Nexus) GetRepoState(did string) (models.RepoState, error) { 150 + var filterDid models.FilterDid 151 + if err := n.db.First(&filterDid, "did = ?", did).Error; err != nil { 152 + return "", err 153 + } 154 + return filterDid.State, nil 147 155 } 148 156 149 157 func (n *Nexus) UpdateRepoState(did string, state models.RepoState, rev string, errorMsg string) error {