this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor firehose methods

dholms 9c43382b 9a1fe128

+65 -44
+58 -31
nexus/firehose.go
··· 19 19 "gorm.io/gorm" 20 20 ) 21 21 22 + type Op struct { 23 + Did string `json:"did"` 24 + Collection string `json:"collection"` 25 + Rkey string `json:"rkey"` 26 + Action string `json:"action"` 27 + Record map[string]interface{} `json:"record,omitempty"` 28 + Cid string `json:"cid,omitempty"` 29 + } 30 + 22 31 type FirehoseConsumer struct { 23 32 RelayHost string 24 33 Filter *StringSet ··· 27 36 Parallelism int 28 37 PersistCursorEvery int 29 38 30 - OnCommit func(context.Context, *comatproto.SyncSubscribeRepos_Commit) error 39 + OnEvent func(context.Context, *Op) error 31 40 } 32 41 33 42 func (fc *FirehoseConsumer) Run(ctx context.Context) error { ··· 63 72 rsc := &events.RepoStreamCallbacks{ 64 73 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 65 74 if fc.Filter.Contains(evt.Repo) { 66 - err := fc.OnCommit(ctx, evt) 75 + err := fc.handleCommitEvent(ctx, evt) 67 76 if err != nil { 68 77 return err 69 78 } ··· 112 121 return fc.DB.Save(&cursor).Error 113 122 } 114 123 115 - func (n *Nexus) handleCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 116 - state, err := n.GetRepoState(evt.Repo) 124 + func (fc *FirehoseConsumer) getRepoState(did string) (models.RepoState, error) { 125 + var d models.Did 126 + if err := fc.DB.First(&d, "did = ?", did).Error; err != nil { 127 + return "", err 128 + } 129 + return d.State, nil 130 + } 131 + 132 + func (fc *FirehoseConsumer) updateRepoState(did string, state models.RepoState, rev string, errorMsg string) error { 133 + return fc.DB.Model(&models.Did{}). 134 + Where("did = ?", did). 135 + Updates(map[string]interface{}{ 136 + "state": state, 137 + "rev": rev, 138 + "error_msg": errorMsg, 139 + }).Error 140 + } 141 + 142 + func (fc *FirehoseConsumer) bufferCommitEvent(evt *comatproto.SyncSubscribeRepos_Commit) error { 143 + for _, op := range evt.Ops { 144 + bufferedEvt := models.BufferedEvt{ 145 + Did: evt.Repo, 146 + Collection: op.Path[:len(op.Path)-len(op.Path[len(op.Path)-1:])], // extract collection from path 147 + Rkey: op.Path[len(op.Path)-1:], // extract rkey from path 148 + Action: op.Action, 149 + Cid: op.Cid.String(), 150 + } 151 + if err := fc.DB.Create(&bufferedEvt).Error; err != nil { 152 + fc.Logger.Error("failed to buffer event", "did", evt.Repo, "path", op.Path, "error", err) 153 + } 154 + } 155 + return nil 156 + } 157 + 158 + func (fc *FirehoseConsumer) handleCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 159 + state, err := fc.getRepoState(evt.Repo) 117 160 if err != nil { 118 - n.logger.Error("failed to get repo state", "did", evt.Repo, "error", err) 161 + fc.Logger.Error("failed to get repo state", "did", evt.Repo, "error", err) 119 162 return nil 120 163 } 121 164 122 165 if state == models.RepoStatePending { 123 166 return nil 124 167 } else if state == models.RepoStateBackfilling { 125 - return n.bufferCommitEvent(evt) 168 + return fc.bufferCommitEvent(evt) 126 169 } 127 170 128 171 r, err := repo.VerifyCommitMessage(ctx, evt) 129 172 if err != nil { 130 - n.logger.Info("failed to verify commit", "did", evt.Repo, "error", err) 173 + fc.Logger.Info("failed to verify commit", "did", evt.Repo, "error", err) 131 174 return err 132 175 } 133 176 ··· 148 191 Action: "delete", 149 192 } 150 193 151 - if err := n.outbox.Send(outOp); err != nil { 194 + if err := fc.OnEvent(ctx, outOp); err != nil { 152 195 return err 153 196 } 154 197 155 - if err := n.db.Where("did = ? AND collection = ? AND rkey = ?", evt.Repo, collStr, rkeyStr).Delete(&models.RepoRecord{}).Error; err != nil { 156 - n.logger.Error("failed to delete repo record", "did", evt.Repo, "path", op.Path, "error", err) 198 + if err := fc.DB.Where("did = ? AND collection = ? AND rkey = ?", evt.Repo, collStr, rkeyStr).Delete(&models.RepoRecord{}).Error; err != nil { 199 + fc.Logger.Error("failed to delete repo record", "did", evt.Repo, "path", op.Path, "error", err) 157 200 } 158 201 continue 159 202 } ··· 178 221 Cid: cidStr, 179 222 } 180 223 181 - if err := n.outbox.Send(outOp); err != nil { 224 + if err := fc.OnEvent(ctx, outOp); err != nil { 182 225 return err 183 226 } 184 227 ··· 189 232 Cid: cidStr, 190 233 Rev: evt.Rev, 191 234 } 192 - if err := n.db.Save(&repoRecord).Error; err != nil { 193 - n.logger.Error("failed to save repo record", "did", evt.Repo, "path", op.Path, "error", err) 235 + if err := fc.DB.Save(&repoRecord).Error; err != nil { 236 + fc.Logger.Error("failed to save repo record", "did", evt.Repo, "path", op.Path, "error", err) 194 237 } 195 238 } 196 239 197 - if err := n.UpdateRepoState(evt.Repo, models.RepoStateActive, evt.Rev, ""); err != nil { 198 - n.logger.Error("failed to update rev", "did", evt.Repo, "error", err) 240 + if err := fc.updateRepoState(evt.Repo, models.RepoStateActive, evt.Rev, ""); err != nil { 241 + fc.Logger.Error("failed to update rev", "did", evt.Repo, "error", err) 199 242 } 200 243 201 244 return nil 202 245 } 203 - 204 - func (n *Nexus) bufferCommitEvent(evt *comatproto.SyncSubscribeRepos_Commit) error { 205 - for _, op := range evt.Ops { 206 - bufferedEvt := models.BufferedEvt{ 207 - Did: evt.Repo, 208 - Collection: op.Path[:len(op.Path)-len(op.Path[len(op.Path)-1:])], // extract collection from path 209 - Rkey: op.Path[len(op.Path)-1:], // extract rkey from path 210 - Action: op.Action, 211 - Cid: op.Cid.String(), 212 - } 213 - if err := n.db.Create(&bufferedEvt).Error; err != nil { 214 - n.logger.Error("failed to buffer event", "did", evt.Repo, "path", op.Path, "error", err) 215 - } 216 - } 217 - return nil 218 - }
+7 -13
nexus/nexus.go
··· 26 26 FirehoseConsumer *FirehoseConsumer 27 27 } 28 28 29 - type Op struct { 30 - Did string `json:"did"` 31 - Collection string `json:"collection"` 32 - Rkey string `json:"rkey"` 33 - Action string `json:"action"` 34 - Record map[string]interface{} `json:"record,omitempty"` 35 - Cid string `json:"cid,omitempty"` 36 - } 37 - 38 29 type NexusConfig struct { 39 - DBPath string 40 - RelayHost string 41 - FirehoseParallelism int 30 + DBPath string 31 + RelayHost string 32 + FirehoseParallelism int 42 33 FirehosePersistCursorEvery int 43 34 } 44 35 ··· 91 82 DB: db, 92 83 Parallelism: parallelism, 93 84 PersistCursorEvery: persistCursorEvery, 94 - OnCommit: n.handleCommitEvent, 85 + OnEvent: n.handleEvent, 95 86 } 96 87 97 88 // run 50 backfill workers ··· 189 180 }).Error 190 181 } 191 182 183 + func (n *Nexus) handleEvent(ctx context.Context, op *Op) error { 184 + return n.outbox.Send(op) 185 + }