this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

tidy & fix cursor logic

dholms b5f0dfd3 b0a38708

+96 -72
+12 -10
nexus/nexus.go
··· 34 34 DBPath string 35 35 RelayHost string 36 36 FirehoseParallelism int 37 - FirehosePersistCursorEvery int 37 + FirehoseCursorSaveInterval time.Duration 38 38 } 39 39 40 40 func NewNexus(config NexusConfig) (*Nexus, error) { ··· 76 76 parallelism = 10 77 77 } 78 78 79 - persistCursorEvery := config.FirehosePersistCursorEvery 80 - if persistCursorEvery == 0 { 81 - persistCursorEvery = 100 79 + cursorSaveInterval := config.FirehoseCursorSaveInterval 80 + if cursorSaveInterval == 0 { 81 + cursorSaveInterval = 5 * time.Second 82 82 } 83 83 84 84 cursor, err := n.readLastCursor(context.Background(), config.RelayHost) ··· 87 87 } 88 88 89 89 n.EventProcessor = &EventProcessor{ 90 - Logger: n.logger.With("component", "processor"), 91 - DB: db, 92 - Dir: n.Dir, 93 - PersistCursorEvery: persistCursorEvery, 94 - RelayHost: config.RelayHost, 95 - Outbox: n.outbox, 90 + Logger: n.logger.With("component", "processor"), 91 + DB: db, 92 + Dir: n.Dir, 93 + RelayHost: config.RelayHost, 94 + Outbox: n.outbox, 96 95 } 97 96 98 97 rsc := &events.RepoStreamCallbacks{ ··· 112 111 for i := 0; i < 50; i++ { 113 112 go n.runBackfillWorker(context.Background(), i) 114 113 } 114 + 115 + // Start cursor saver goroutine 116 + go n.EventProcessor.RunCursorSaver(context.Background(), cursorSaveInterval) 115 117 116 118 err = n.LoadFilters() 117 119 if err != nil {
+43 -62
nexus/processor.go
··· 5 5 "encoding/json" 6 6 "fmt" 7 7 "log/slog" 8 - "sync/atomic" 8 + "sync" 9 + "time" 9 10 10 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 12 "github.com/bluesky-social/indigo/atproto/data" ··· 16 17 "gorm.io/gorm" 17 18 ) 18 19 19 - type Commit struct { 20 - Did string `json:"did"` 21 - Rev string `json:"rev"` 22 - Ops []CommitOp `json:"ops"` 23 - } 24 - 25 - type CommitOp struct { 26 - Collection string `json:"collection"` 27 - Rkey string `json:"rkey"` 28 - Action string `json:"action"` 29 - Record map[string]interface{} `json:"record,omitempty"` 30 - Cid string `json:"cid,omitempty"` 31 - } 32 - 33 - func (c *Commit) ToOps() []*Op { 34 - var ops []*Op 35 - for _, op := range c.Ops { 36 - ops = append(ops, &Op{ 37 - Did: c.Did, 38 - Rev: c.Rev, 39 - Collection: op.Collection, 40 - Rkey: op.Rkey, 41 - Action: op.Action, 42 - Record: op.Record, 43 - Cid: op.Cid, 44 - }) 45 - } 46 - return ops 47 - } 48 - 49 - type Op struct { 50 - Did string `json:"did"` 51 - Rev string `json:"rev"` 52 - Collection string `json:"collection"` 53 - Rkey string `json:"rkey"` 54 - Action string `json:"action"` 55 - Record map[string]interface{} `json:"record,omitempty"` 56 - Cid string `json:"cid,omitempty"` 57 - } 58 - 59 20 type EventProcessor struct { 60 - Logger *slog.Logger 61 - DB *gorm.DB 62 - Dir identity.Directory 63 - PersistCursorEvery int 64 - RelayHost string 65 - Outbox *Outbox 21 + Logger *slog.Logger 22 + DB *gorm.DB 23 + Dir identity.Directory 24 + RelayHost string 25 + Outbox *Outbox 66 26 67 - eventCount uint64 27 + lastSeq int64 28 + seqMu sync.Mutex 68 29 } 69 30 70 31 func (ep *EventProcessor) ProcessCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 71 - // @TODO this should happen at end of processing 72 - // Persist cursor periodically 73 - count := atomic.AddUint64(&ep.eventCount, 1) 74 - if count%uint64(ep.PersistCursorEvery) == 0 { 75 - if err := ep.persistCursor(ep.RelayHost, evt.Seq); err != nil { 76 - ep.Logger.Error("failed to persist cursor", "seq", evt.Seq, "error", err) 77 - } 78 - } 32 + defer ep.trackLastSeq(evt.Seq) 79 33 80 34 var d models.Did 81 35 if err := ep.DB.First(&d, "did = ?", evt.Repo).Error; err != nil { ··· 261 215 return nil 262 216 } 263 217 264 - func (ep *EventProcessor) persistCursor(relayHost string, seq int64) error { 265 - if seq <= 0 { 218 + func (ep *EventProcessor) trackLastSeq(seq int64) { 219 + ep.seqMu.Lock() 220 + ep.lastSeq = seq 221 + ep.seqMu.Unlock() 222 + } 223 + 224 + func (ep *EventProcessor) saveCursor(ctx context.Context) error { 225 + ep.seqMu.Lock() 226 + seq := ep.lastSeq 227 + ep.seqMu.Unlock() 228 + 229 + if seq == 0 { 266 230 return nil 267 231 } 268 232 269 - cursor := models.Cursor{ 270 - Host: relayHost, 233 + return ep.DB.Save(&models.Cursor{ 234 + Host: ep.RelayHost, 271 235 Cursor: seq, 236 + }).Error 237 + } 238 + 239 + func (ep *EventProcessor) RunCursorSaver(ctx context.Context, interval time.Duration) { 240 + ticker := time.NewTicker(interval) 241 + defer ticker.Stop() 242 + 243 + for { 244 + select { 245 + case <-ctx.Done(): 246 + if err := ep.saveCursor(ctx); err != nil { 247 + ep.Logger.Error("failed to save cursor on shutdown", "error", err) 248 + } 249 + return 250 + case <-ticker.C: 251 + if err := ep.saveCursor(ctx); err != nil { 252 + ep.Logger.Error("failed to save cursor", "error", err) 253 + } 254 + } 272 255 } 273 - 274 - return ep.DB.Save(&cursor).Error 275 256 }
+41
nexus/types.go
··· 1 + package main 2 + 3 + type Commit struct { 4 + Did string `json:"did"` 5 + Rev string `json:"rev"` 6 + Ops []CommitOp `json:"ops"` 7 + } 8 + 9 + type CommitOp struct { 10 + Collection string `json:"collection"` 11 + Rkey string `json:"rkey"` 12 + Action string `json:"action"` 13 + Record map[string]interface{} `json:"record,omitempty"` 14 + Cid string `json:"cid,omitempty"` 15 + } 16 + 17 + func (c *Commit) ToOps() []*Op { 18 + var ops []*Op 19 + for _, op := range c.Ops { 20 + ops = append(ops, &Op{ 21 + Did: c.Did, 22 + Rev: c.Rev, 23 + Collection: op.Collection, 24 + Rkey: op.Rkey, 25 + Action: op.Action, 26 + Record: op.Record, 27 + Cid: op.Cid, 28 + }) 29 + } 30 + return ops 31 + } 32 + 33 + type Op struct { 34 + Did string `json:"did"` 35 + Rev string `json:"rev"` 36 + Collection string `json:"collection"` 37 + Rkey string `json:"rkey"` 38 + Action string `json:"action"` 39 + Record map[string]interface{} `json:"record,omitempty"` 40 + Cid string `json:"cid,omitempty"` 41 + }