this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

real job queue for backfilling

dholms b4aecdb4 b5f0dfd3

+84 -158
+55
nexus/backfill.go
··· 6 6 "fmt" 7 7 "io" 8 8 "net/http" 9 + "time" 9 10 10 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 12 "github.com/bluesky-social/indigo/atproto/data" ··· 14 15 "github.com/bluesky-social/indigo/repo" 15 16 "github.com/bluesky-social/indigo/xrpc" 16 17 "github.com/ipfs/go-cid" 18 + "gorm.io/gorm" 17 19 ) 20 + 21 + func (n *Nexus) runBackfillWorker(ctx context.Context, workerID int) { 22 + logger := n.logger.With("worker", workerID) 23 + 24 + for { 25 + did, found, err := n.claimBackfillJob(ctx) 26 + if err != nil { 27 + logger.Error("failed to claim backfill job", "error", err) 28 + time.Sleep(3 * time.Second) 29 + continue 30 + } 31 + 32 + if !found { 33 + time.Sleep(3 * time.Second) 34 + continue 35 + } 36 + 37 + logger.Info("processing backfill", "did", did) 38 + err = n.backfillDid(ctx, did) 39 + if err != nil { 40 + logger.Error("backfill failed", "did", did, "error", err) 41 + } 42 + } 43 + } 44 + 45 + func (n *Nexus) claimBackfillJob(ctx context.Context) (string, bool, error) { 46 + var did models.Did 47 + err := n.db.Transaction(func(tx *gorm.DB) error { 48 + if err := tx.Where("state = ?", models.RepoStatePending). 49 + First(&did).Error; err != nil { 50 + return err 51 + } 52 + 53 + return tx.Model(&models.Did{}). 54 + Where("did = ?", did.Did). 55 + Update("state", models.RepoStateBackfilling).Error 56 + }) 57 + 58 + if err != nil { 59 + if err == gorm.ErrRecordNotFound { 60 + return "", false, nil 61 + } 62 + return "", false, err 63 + } 64 + 65 + return did.Did, true, nil 66 + } 18 67 19 68 func (n *Nexus) backfillDid(ctx context.Context, did string) error { 20 69 if err := n.db.Model(&models.Did{}). ··· 178 227 n.logger.Info("backfill repo complete", "did", did, "records", numRecords, "rev", rev) 179 228 return rev, nil 180 229 } 230 + 231 + func (n *Nexus) resetBackfillingToPending() error { 232 + return n.db.Model(&models.Did{}). 233 + Where("state = ?", models.RepoStateBackfilling). 234 + Update("state", models.RepoStatePending).Error 235 + }
+1 -7
nexus/handlers.go
··· 63 63 return echo.NewHTTPError(http.StatusInternalServerError) 64 64 } 65 65 66 - n.filter.AddBatch(payload.DIDs) 67 - 68 - for _, did := range payload.DIDs { 69 - n.queueBackfill(did) 70 - } 71 - 72 - n.logger.Info("added dids and queued backfills", "count", len(payload.DIDs)) 66 + n.logger.Info("added dids", "count", len(payload.DIDs)) 73 67 74 68 return c.JSON(http.StatusOK, map[string]interface{}{ 75 69 "count": len(payload.DIDs),
+1 -1
nexus/models/models.go
··· 11 11 12 12 type Did struct { 13 13 Did string `gorm:"primaryKey"` 14 - State RepoState `gorm:"not null;default:'pending'"` 14 + State RepoState `gorm:"not null;default:'pending';index"` 15 15 Rev string `gorm:"type:text"` 16 16 ErrorMsg string `gorm:"type:text"` 17 17 }
+15 -69
nexus/nexus.go
··· 20 20 echo *echo.Echo 21 21 logger *slog.Logger 22 22 23 - filter *StringSet 24 - Dir identity.Directory 23 + Dir identity.Directory 25 24 26 - outbox *Outbox 27 - backfillQueue *BackfillQueue 25 + outbox *Outbox 28 26 29 27 FirehoseConsumer *FirehoseConsumer 30 28 EventProcessor *EventProcessor ··· 38 36 } 39 37 40 38 func NewNexus(config NexusConfig) (*Nexus, error) { 41 - db, err := gorm.Open(sqlite.Open(config.DBPath), &gorm.Config{ 39 + db, err := gorm.Open(sqlite.Open(config.DBPath+"?_journal_mode=WAL"), &gorm.Config{ 42 40 Logger: logger.Default.LogMode(logger.Silent), 43 41 }) 44 42 if err != nil { ··· 64 62 echo: e, 65 63 logger: slog.Default().With("system", "nexus"), 66 64 67 - filter: NewStringSet(), 68 - Dir: &cdir, 65 + Dir: &cdir, 69 66 70 - outbox: NewOutbox(db), 71 - backfillQueue: NewBackfillQueue(), 67 + outbox: NewOutbox(db), 72 68 } 73 69 74 70 parallelism := config.FirehoseParallelism ··· 81 77 cursorSaveInterval = 5 * time.Second 82 78 } 83 79 84 - cursor, err := n.readLastCursor(context.Background(), config.RelayHost) 85 - if err != nil { 86 - return nil, err 87 - } 88 - 89 80 n.EventProcessor = &EventProcessor{ 90 81 Logger: n.logger.With("component", "processor"), 91 82 DB: db, ··· 94 85 Outbox: n.outbox, 95 86 } 96 87 88 + cursor, err := n.EventProcessor.ReadLastCursor(context.Background(), config.RelayHost) 89 + if err != nil { 90 + return nil, err 91 + } 92 + 97 93 rsc := &events.RepoStreamCallbacks{ 98 94 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 99 95 return n.EventProcessor.ProcessCommit(context.Background(), evt) ··· 108 104 Callbacks: rsc, 109 105 } 110 106 107 + // crash recovery: reset any backfilling repos to pending on service startup 108 + if err := n.resetBackfillingToPending(); err != nil { 109 + return nil, err 110 + } 111 + 111 112 for i := 0; i < 50; i++ { 112 113 go n.runBackfillWorker(context.Background(), i) 113 114 } 114 115 115 - // Start cursor saver goroutine 116 116 go n.EventProcessor.RunCursorSaver(context.Background(), cursorSaveInterval) 117 117 118 - err = n.LoadFilters() 119 - if err != nil { 120 - return nil, err 121 - } 122 - 123 118 n.registerRoutes() 124 119 125 120 return n, nil ··· 149 144 150 145 return nil 151 146 } 152 - 153 - func (n *Nexus) LoadFilters() error { 154 - var dids []models.Did 155 - if err := n.db.Find(&dids).Error; err != nil { 156 - return err 157 - } 158 - 159 - didStrings := make([]string, 0, len(dids)) 160 - for _, d := range dids { 161 - didStrings = append(didStrings, d.Did) 162 - 163 - if d.State == models.RepoStatePending || d.State == models.RepoStateBackfilling { 164 - n.queueBackfill(d.Did) 165 - } 166 - } 167 - 168 - n.filter.AddBatch(didStrings) 169 - return nil 170 - } 171 - 172 - func (n *Nexus) runBackfillWorker(ctx context.Context, workerID int) { 173 - logger := n.logger.With("worker", workerID) 174 - 175 - for { 176 - did := n.backfillQueue.Dequeue() 177 - logger.Info("processing backfill", "did", did) 178 - err := n.backfillDid(ctx, did) 179 - if err != nil { 180 - logger.Error("backfill failed", "did", did, "error", err) 181 - } 182 - } 183 - } 184 - 185 - func (n *Nexus) queueBackfill(did string) { 186 - depth := n.backfillQueue.Enqueue(did) 187 - n.logger.Info("queued backfill", "did", did, "queue_depth", depth) 188 - } 189 - 190 - func (n *Nexus) readLastCursor(ctx context.Context, relayHost string) (int64, error) { 191 - var cursor models.Cursor 192 - if err := n.db.Where("host = ?", relayHost).First(&cursor).Error; err != nil { 193 - if err == gorm.ErrRecordNotFound { 194 - n.logger.Info("no pre-existing cursor in database", "relayHost", relayHost) 195 - return 0, nil 196 - } 197 - return 0, err 198 - } 199 - return cursor.Cursor, nil 200 - }
+12
nexus/processor.go
··· 254 254 } 255 255 } 256 256 } 257 + 258 + func (ep *EventProcessor) ReadLastCursor(ctx context.Context, relayHost string) (int64, error) { 259 + var cursor models.Cursor 260 + if err := ep.DB.Where("host = ?", relayHost).First(&cursor).Error; err != nil { 261 + if err == gorm.ErrRecordNotFound { 262 + ep.Logger.Info("no pre-existing cursor in database", "relayHost", relayHost) 263 + return 0, nil 264 + } 265 + return 0, err 266 + } 267 + return cursor.Cursor, nil 268 + }
-46
nexus/queue.go
··· 1 - package main 2 - 3 - import "sync" 4 - 5 - type BackfillQueue struct { 6 - queue []string 7 - head int 8 - mu sync.Mutex 9 - cond *sync.Cond 10 - } 11 - 12 - func NewBackfillQueue() *BackfillQueue { 13 - q := &BackfillQueue{ 14 - queue: make([]string, 0), 15 - head: 0, 16 - } 17 - q.cond = sync.NewCond(&q.mu) 18 - return q 19 - } 20 - 21 - func (q *BackfillQueue) Enqueue(did string) int { 22 - q.mu.Lock() 23 - q.queue = append(q.queue, did) 24 - depth := len(q.queue) - q.head 25 - q.mu.Unlock() 26 - q.cond.Signal() 27 - return depth 28 - } 29 - 30 - func (q *BackfillQueue) Dequeue() string { 31 - q.mu.Lock() 32 - for q.head >= len(q.queue) { 33 - q.cond.Wait() 34 - } 35 - did := q.queue[q.head] 36 - q.head++ 37 - 38 - // trim queue every 10000 elements 39 - if q.head > 10000 { 40 - q.queue = append([]string(nil), q.queue[q.head:]...) 41 - q.head = 0 42 - } 43 - q.mu.Unlock() 44 - 45 - return did 46 - }
-35
nexus/stringset.go
··· 1 - package main 2 - 3 - import "sync" 4 - 5 - type StringSet struct { 6 - items map[string]bool 7 - mu sync.RWMutex 8 - } 9 - 10 - func NewStringSet() *StringSet { 11 - return &StringSet{ 12 - items: make(map[string]bool), 13 - } 14 - } 15 - 16 - func (s *StringSet) Add(item string) { 17 - s.mu.Lock() 18 - s.items[item] = true 19 - s.mu.Unlock() 20 - } 21 - 22 - func (s *StringSet) AddBatch(items []string) { 23 - s.mu.Lock() 24 - for _, item := range items { 25 - s.items[item] = true 26 - } 27 - s.mu.Unlock() 28 - } 29 - 30 - func (s *StringSet) Contains(item string) bool { 31 - s.mu.RLock() 32 - exists := s.items[item] 33 - s.mu.RUnlock() 34 - return exists 35 - }