this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

cursor management

dholms 3569e157 edb89a68

+74 -8
+35 -5
nexus/firehose.go
··· 5 5 "fmt" 6 6 "net/http" 7 7 "net/url" 8 + "sync/atomic" 8 9 9 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 10 11 "github.com/bluesky-social/indigo/atproto/data" ··· 16 17 "github.com/gorilla/websocket" 17 18 ) 18 19 20 + const persistCursorEvery = 100 21 + 19 22 func (n *Nexus) SubscribeFirehose(ctx context.Context) error { 20 - relayHost := "https://bsky.network" 23 + cur, err := n.ReadLastCursor(ctx) 24 + if err != nil { 25 + return err 26 + } 21 27 22 28 dialer := websocket.DefaultDialer 23 - u, err := url.Parse(relayHost) 29 + u, err := url.Parse(n.RelayHost) 24 30 if err != nil { 25 31 return fmt.Errorf("invalid relayHost URI: %w", err) 26 32 } ··· 31 37 u.Scheme = "wss" 32 38 } 33 39 u.Path = "xrpc/com.atproto.sync.subscribeRepos" 40 + if cur != 0 { 41 + u.RawQuery = fmt.Sprintf("cursor=%d", cur) 42 + } 34 43 urlString := u.String() 44 + n.logger.Info("subscribing to firehose", "relayHost", n.RelayHost, "cursor", cur) 35 45 con, _, err := dialer.Dial(urlString, http.Header{}) 36 46 if err != nil { 37 47 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 38 48 } 39 49 50 + var lastSeq atomic.Uint64 51 + var eventCount atomic.Uint64 52 + 40 53 rsc := &events.RepoStreamCallbacks{ 41 54 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 55 + lastSeq.Swap(uint64(evt.Seq)) 56 + if eventCount.Add(1)%persistCursorEvery == 0 { 57 + if err := n.PersistCursor(ctx, evt.Seq); err != nil { 58 + n.logger.Error("failed to persist cursor", "seq", evt.Seq, "error", err) 59 + } 60 + } 42 61 return n.handleCommitEvent(ctx, evt) 43 62 }, 44 63 RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error { 45 64 return nil 46 65 }, 47 66 RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 67 + lastSeq.Swap(uint64(evt.Seq)) 68 + if eventCount.Add(1)%persistCursorEvery == 0 { 69 + if err := n.PersistCursor(ctx, evt.Seq); err != nil { 70 + n.logger.Error("failed to persist cursor", "seq", evt.Seq, "error", err) 71 + } 72 + } 48 73 return nil 49 74 }, 50 75 RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error { 76 + lastSeq.Swap(uint64(evt.Seq)) 77 + if eventCount.Add(1)%persistCursorEvery == 0 { 78 + if err := n.PersistCursor(ctx, evt.Seq); err != nil { 79 + n.logger.Error("failed to persist cursor", "seq", evt.Seq, "error", err) 80 + } 81 + } 51 82 return nil 52 83 }, 53 84 } 54 85 55 86 scheduler := parallel.NewScheduler( 56 - 1, 87 + 10, 57 88 100, 58 - relayHost, 89 + n.RelayHost, 59 90 rsc.EventHandler, 60 91 ) 61 - n.logger.Info("starting firehose consumer", "relayHost", relayHost) 62 92 return events.HandleRepoStream(ctx, con, scheduler, nil) 63 93 } 64 94
+2 -1
nexus/main.go
··· 11 11 12 12 func main() { 13 13 nexus, err := NewNexus(NexusConfig{ 14 - DBPath: "./nexus.db", 14 + DBPath: "./nexus.db", 15 + RelayHost: "https://bsky.network", 15 16 }) 16 17 if err != nil { 17 18 log.Fatal(err)
+5
nexus/models/models.go
··· 50 50 Rev string `gorm:"not null"` 51 51 UpdatedAt time.Time 52 52 } 53 + 54 + type Cursor struct { 55 + Host string `gorm:"primaryKey"` 56 + Cursor int64 `gorm:"not null"` 57 + }
+32 -2
nexus/nexus.go
··· 22 22 23 23 outbox *Outbox 24 24 backfillQueue *BackfillQueue 25 + 26 + RelayHost string 25 27 } 26 28 27 29 type Op struct { ··· 34 36 } 35 37 36 38 type NexusConfig struct { 37 - DBPath string 39 + DBPath string 40 + RelayHost string 38 41 } 39 42 40 43 func NewNexus(config NexusConfig) (*Nexus, error) { ··· 43 46 return nil, err 44 47 } 45 48 46 - if err := db.AutoMigrate(&models.BufferedEvt{}, &models.FilterDid{}, &models.RepoRecord{}, &models.BackfillBuffer{}); err != nil { 49 + if err := db.AutoMigrate(&models.BufferedEvt{}, &models.FilterDid{}, &models.RepoRecord{}, &models.BackfillBuffer{}, &models.Cursor{}); err != nil { 47 50 return nil, err 48 51 } 49 52 ··· 67 70 68 71 outbox: NewOutbox(db), 69 72 backfillQueue: NewBackfillQueue(), 73 + 74 + RelayHost: config.RelayHost, 70 75 } 71 76 72 77 // run 50 backfill workers ··· 163 168 "error_msg": errorMsg, 164 169 }).Error 165 170 } 171 + 172 + func (n *Nexus) ReadLastCursor(ctx context.Context) (int64, error) { 173 + var cursor models.Cursor 174 + if err := n.db.Where("host = ?", n.RelayHost).First(&cursor).Error; err != nil { 175 + if err == gorm.ErrRecordNotFound { 176 + n.logger.Info("no pre-existing cursor in database", "relayHost", n.RelayHost) 177 + return 0, nil 178 + } 179 + return 0, err 180 + } 181 + return cursor.Cursor, nil 182 + } 183 + 184 + func (n *Nexus) PersistCursor(ctx context.Context, seq int64) error { 185 + if seq <= 0 { 186 + return nil 187 + } 188 + 189 + cursor := models.Cursor{ 190 + Host: n.RelayHost, 191 + Cursor: seq, 192 + } 193 + 194 + return n.db.Save(&cursor).Error 195 + }