this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor firehose

dholms 0c4350b5 3569e157

+74 -64
+49 -35
nexus/firehose.go
··· 3 3 import ( 4 4 "context" 5 5 "fmt" 6 + "log/slog" 6 7 "net/http" 7 8 "net/url" 8 9 "sync/atomic" ··· 15 16 "github.com/bluesky-social/indigo/events/schedulers/parallel" 16 17 "github.com/bluesky-social/indigo/nexus/models" 17 18 "github.com/gorilla/websocket" 19 + "gorm.io/gorm" 18 20 ) 19 21 20 - const persistCursorEvery = 100 22 + type FirehoseConsumer struct { 23 + RelayHost string 24 + Filter *StringSet 25 + Logger *slog.Logger 26 + DB *gorm.DB 27 + Parallelism int 28 + PersistCursorEvery int 29 + 30 + OnCommit func(context.Context, *comatproto.SyncSubscribeRepos_Commit) error 31 + } 21 32 22 - func (n *Nexus) SubscribeFirehose(ctx context.Context) error { 23 - cur, err := n.ReadLastCursor(ctx) 33 + func (fc *FirehoseConsumer) Run(ctx context.Context) error { 34 + cur, err := fc.readLastCursor(ctx) 24 35 if err != nil { 25 36 return err 26 37 } 27 38 28 39 dialer := websocket.DefaultDialer 29 - u, err := url.Parse(n.RelayHost) 40 + u, err := url.Parse(fc.RelayHost) 30 41 if err != nil { 31 42 return fmt.Errorf("invalid relayHost URI: %w", err) 32 43 } ··· 41 52 u.RawQuery = fmt.Sprintf("cursor=%d", cur) 42 53 } 43 54 urlString := u.String() 44 - n.logger.Info("subscribing to firehose", "relayHost", n.RelayHost, "cursor", cur) 55 + fc.Logger.Info("subscribing to firehose", "relayHost", fc.RelayHost, "cursor", cur) 45 56 con, _, err := dialer.Dial(urlString, http.Header{}) 46 57 if err != nil { 47 58 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 48 59 } 49 60 50 - var lastSeq atomic.Uint64 51 61 var eventCount atomic.Uint64 52 62 53 63 rsc := &events.RepoStreamCallbacks{ 54 64 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 55 - lastSeq.Swap(uint64(evt.Seq)) 56 - if eventCount.Add(1)%persistCursorEvery == 0 { 57 - if err := n.PersistCursor(ctx, evt.Seq); err != nil { 58 - n.logger.Error("failed to persist cursor", "seq", evt.Seq, "error", err) 65 + if eventCount.Add(1)%uint64(fc.PersistCursorEvery) == 0 { 66 + if err := fc.persistCursor(ctx, evt.Seq); err != nil { 67 + fc.Logger.Error("failed to persist cursor", "seq", evt.Seq, "error", err) 59 68 } 60 69 } 61 - return n.handleCommitEvent(ctx, evt) 62 - }, 63 - RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error { 64 - return nil 65 - }, 66 - RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 67 - lastSeq.Swap(uint64(evt.Seq)) 68 - if eventCount.Add(1)%persistCursorEvery == 0 { 69 - if err := n.PersistCursor(ctx, evt.Seq); err != nil { 70 - n.logger.Error("failed to persist cursor", "seq", evt.Seq, "error", err) 71 - } 72 - } 73 - return nil 74 - }, 75 - RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error { 76 - lastSeq.Swap(uint64(evt.Seq)) 77 - if eventCount.Add(1)%persistCursorEvery == 0 { 78 - if err := n.PersistCursor(ctx, evt.Seq); err != nil { 79 - n.logger.Error("failed to persist cursor", "seq", evt.Seq, "error", err) 80 - } 70 + 71 + if !fc.Filter.Contains(evt.Repo) { 72 + return nil 81 73 } 82 - return nil 74 + 75 + return fc.OnCommit(ctx, evt) 83 76 }, 84 77 } 85 78 86 79 scheduler := parallel.NewScheduler( 87 - 10, 80 + fc.Parallelism, 88 81 100, 89 - n.RelayHost, 82 + fc.RelayHost, 90 83 rsc.EventHandler, 91 84 ) 92 85 return events.HandleRepoStream(ctx, con, scheduler, nil) 93 86 } 94 87 95 - func (n *Nexus) handleCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 96 - if !n.filter.Contains(evt.Repo) { 88 + func (fc *FirehoseConsumer) readLastCursor(ctx context.Context) (int64, error) { 89 + var cursor models.Cursor 90 + if err := fc.DB.Where("host = ?", fc.RelayHost).First(&cursor).Error; err != nil { 91 + if err == gorm.ErrRecordNotFound { 92 + fc.Logger.Info("no pre-existing cursor in database", "relayHost", fc.RelayHost) 93 + return 0, nil 94 + } 95 + return 0, err 96 + } 97 + return cursor.Cursor, nil 98 + } 99 + 100 + func (fc *FirehoseConsumer) persistCursor(ctx context.Context, seq int64) error { 101 + if seq <= 0 { 97 102 return nil 98 103 } 99 104 105 + cursor := models.Cursor{ 106 + Host: fc.RelayHost, 107 + Cursor: seq, 108 + } 109 + 110 + return fc.DB.Save(&cursor).Error 111 + } 112 + 113 + func (n *Nexus) handleCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 100 114 state, err := n.GetRepoState(evt.Repo) 101 115 if err != nil { 102 116 n.logger.Error("failed to get repo state", "did", evt.Repo, "error", err)
+1 -1
nexus/main.go
··· 20 20 21 21 fhCtx, fhCancel := context.WithCancel(context.Background()) 22 22 go func() { 23 - err := nexus.SubscribeFirehose(fhCtx) 23 + err := nexus.FirehoseConsumer.Run(fhCtx) 24 24 if err != nil { 25 25 log.Printf("Firehose error: %v", err) 26 26 }
+24 -28
nexus/nexus.go
··· 23 23 outbox *Outbox 24 24 backfillQueue *BackfillQueue 25 25 26 - RelayHost string 26 + FirehoseConsumer *FirehoseConsumer 27 27 } 28 28 29 29 type Op struct { ··· 36 36 } 37 37 38 38 type NexusConfig struct { 39 - DBPath string 40 - RelayHost string 39 + DBPath string 40 + RelayHost string 41 + FirehoseParallelism int 42 + FirehosePersistCursorEvery int 41 43 } 42 44 43 45 func NewNexus(config NexusConfig) (*Nexus, error) { ··· 70 72 71 73 outbox: NewOutbox(db), 72 74 backfillQueue: NewBackfillQueue(), 75 + } 73 76 74 - RelayHost: config.RelayHost, 77 + parallelism := config.FirehoseParallelism 78 + if parallelism == 0 { 79 + parallelism = 10 80 + } 81 + 82 + persistCursorEvery := config.FirehosePersistCursorEvery 83 + if persistCursorEvery == 0 { 84 + persistCursorEvery = 100 85 + } 86 + 87 + n.FirehoseConsumer = &FirehoseConsumer{ 88 + RelayHost: config.RelayHost, 89 + Filter: n.filter, 90 + Logger: n.logger.With("component", "firehose"), 91 + DB: db, 92 + Parallelism: parallelism, 93 + PersistCursorEvery: persistCursorEvery, 94 + OnCommit: n.handleCommitEvent, 75 95 } 76 96 77 97 // run 50 backfill workers ··· 169 189 }).Error 170 190 } 171 191 172 - func (n *Nexus) ReadLastCursor(ctx context.Context) (int64, error) { 173 - var cursor models.Cursor 174 - if err := n.db.Where("host = ?", n.RelayHost).First(&cursor).Error; err != nil { 175 - if err == gorm.ErrRecordNotFound { 176 - n.logger.Info("no pre-existing cursor in database", "relayHost", n.RelayHost) 177 - return 0, nil 178 - } 179 - return 0, err 180 - } 181 - return cursor.Cursor, nil 182 - } 183 - 184 - func (n *Nexus) PersistCursor(ctx context.Context, seq int64) error { 185 - if seq <= 0 { 186 - return nil 187 - } 188 - 189 - cursor := models.Cursor{ 190 - Host: n.RelayHost, 191 - Cursor: seq, 192 - } 193 - 194 - return n.db.Save(&cursor).Error 195 - }