this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

handle rate limiting & retries on errors

dholms 747f7df4 28cf3e90

+111 -69
+6 -23
nexus/firehose.go
··· 4 4 "context" 5 5 "fmt" 6 6 "log/slog" 7 - "math/rand" 8 7 "net/http" 9 8 "net/url" 10 9 "time" ··· 42 41 } 43 42 u.Path = "xrpc/com.atproto.sync.subscribeRepos" 44 43 45 - var backoff int 44 + var retries int 46 45 for { 47 46 select { 48 47 case <-ctx.Done(): ··· 60 59 } 61 60 urlStr := u.String() 62 61 63 - fc.Logger.Info("connecting to firehose", "url", urlStr, "cursor", cursor, "backoff", backoff) 62 + fc.Logger.Info("connecting to firehose", "url", urlStr, "cursor", cursor, "retries", retries) 64 63 65 64 dialer := websocket.DefaultDialer 66 65 con, _, err := dialer.DialContext(ctx, urlStr, http.Header{}) 67 66 if err != nil { 68 - fc.Logger.Warn("dialing failed", "err", err, "backoff", backoff) 69 - time.Sleep(sleepForBackoff(backoff)) 70 - backoff++ 67 + fc.Logger.Warn("dialing failed", "err", err, "retries", retries) 68 + time.Sleep(backoff(retries, 10)) 69 + retries++ 71 70 continue 72 71 } 73 72 74 73 fc.Logger.Info("connected to firehose") 75 - backoff = 0 74 + retries = 0 76 75 77 76 if err := events.HandleRepoStream(ctx, con, scheduler, nil); err != nil { 78 77 fc.Logger.Warn("firehose connection failed", "err", err) 79 78 } 80 79 } 81 80 } 82 - 83 - func sleepForBackoff(b int) time.Duration { 84 - if b == 0 { 85 - return 0 86 - } 87 - 88 - // exponential, capped at 10s 89 - duration := time.Second * (1 << b) 90 - if duration > time.Second*10 { 91 - duration = time.Second * 10 92 - } 93 - 94 - // Add jitter 95 - jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 96 - return duration + jitter 97 - }
+9 -7
nexus/models/models.go
··· 24 24 ) 25 25 26 26 type Repo struct { 27 - Did string `gorm:"primaryKey"` 28 - State RepoState `gorm:"not null;default:'pending';index"` 29 - Status AccountStatus `gorm:"not null;default:'active'"` 30 - Handle string `gorm:"type:text"` 31 - Rev string `gorm:"type:text"` 32 - PrevData string `gorm:"type:text"` 33 - ErrorMsg string `gorm:"type:text"` 27 + Did string `gorm:"primaryKey"` 28 + State RepoState `gorm:"not null;default:'pending';index"` 29 + Status AccountStatus `gorm:"not null;default:'active'"` 30 + Handle string `gorm:"type:text"` 31 + Rev string `gorm:"type:text"` 32 + PrevData string `gorm:"type:text"` 33 + ErrorMsg string `gorm:"type:text"` 34 + RetryCount int `gorm:"not null;default:0"` 35 + RetryAfter int64 `gorm:"not null;default:0"` // Unix timestamp 34 36 } 35 37 36 38 type OutboxBuffer struct {
+5 -1
nexus/nexus.go
··· 32 32 FullNetworkMode bool 33 33 CollectionFilters []string 34 34 35 - claimJobMu sync.Mutex 35 + claimJobMu sync.Mutex 36 + pdsBackoff map[string]time.Time 37 + pdsBackoffMu sync.RWMutex 36 38 } 37 39 38 40 type NexusConfig struct { ··· 88 90 89 91 FullNetworkMode: config.FullNetworkMode, 90 92 CollectionFilters: config.CollectionFilters, 93 + 94 + pdsBackoff: make(map[string]time.Time), 91 95 } 92 96 93 97 parallelism := config.FirehoseParallelism
+4 -16
nexus/outbox.go
··· 159 159 } 160 160 161 161 func (o *Outbox) sendWebhook(eventID uint, evt *OutboxEvt) { 162 - backoff := 0 162 + retries := 0 163 163 for { 164 164 if err := o.postWebhook(evt); err != nil { 165 - o.logger.Warn("webhook failed, retrying", "error", err, "id", eventID, "backoff", backoff) 166 - time.Sleep(webhookBackoff(backoff)) 167 - backoff++ 165 + o.logger.Warn("webhook failed, retrying", "error", err, "id", eventID, "retries", retries) 166 + time.Sleep(backoff(retries, 10)) 167 + retries++ 168 168 continue 169 169 } 170 170 ··· 172 172 o.AckEvent(eventID) 173 173 return 174 174 } 175 - } 176 - 177 - func webhookBackoff(attempt int) time.Duration { 178 - if attempt == 0 { 179 - return time.Second 180 - } 181 - // Exponential: 1s, 2s, 4s, 8s, cap at 10s 182 - duration := time.Second * (1 << attempt) 183 - if duration > 10*time.Second { 184 - duration = 10 * time.Second 185 - } 186 - return duration 187 175 } 188 176 189 177 func (o *Outbox) postWebhook(evt *OutboxEvt) error {
+61 -21
nexus/resync.go
··· 74 74 n.logger.Info("failed to refresh identity", "did", did, "error", err) 75 75 } 76 76 77 - err = n.doResync(ctx, did) 78 - if err != nil { 79 - n.db.Model(&models.Repo{}). 80 - Where("did = ?", did). 81 - Updates(map[string]interface{}{ 82 - "state": models.RepoStateError, 83 - "rev": "", 84 - "prev_data": "", 85 - "error_msg": err.Error(), 86 - }) 87 - return err 77 + success, err := n.doResync(ctx, did) 78 + if !success { 79 + return n.handleResyncError(did, err) 88 80 } 89 81 90 82 if err := n.EventProcessor.drainResyncBuffer(ctx, did); err != nil { ··· 96 88 97 89 const BATCH_SIZE = 100 98 90 99 - func (n *Nexus) doResync(ctx context.Context, did string) error { 91 + func (n *Nexus) doResync(ctx context.Context, did string) (bool, error) { 100 92 ident, err := n.Dir.LookupDID(ctx, syntax.DID(did)) 101 93 if err != nil { 102 - return fmt.Errorf("failed to resolve DID: %w", err) 94 + return false, fmt.Errorf("failed to resolve DID: %w", err) 103 95 } 104 96 105 97 pdsURL := ident.PDSEndpoint() 106 98 if pdsURL == "" { 107 - return fmt.Errorf("no PDS endpoint for DID: %s", did) 99 + return false, fmt.Errorf("no PDS endpoint for DID: %s", did) 100 + } 101 + 102 + n.pdsBackoffMu.RLock() 103 + backoffUntil, inBackoff := n.pdsBackoff[pdsURL] 104 + n.pdsBackoffMu.RUnlock() 105 + if inBackoff && time.Now().Before(backoffUntil) { 106 + return false, nil 108 107 } 109 108 110 109 n.logger.Info("fetching repo from PDS", "did", did, "pds", pdsURL) ··· 116 115 117 116 repoBytes, err := comatproto.SyncGetRepo(ctx, client, did, "") 118 117 if err != nil { 119 - return fmt.Errorf("failed to get repo: %w", err) 118 + if isRateLimitError(err) { 119 + n.pdsBackoffMu.Lock() 120 + n.pdsBackoff[pdsURL] = time.Now().Add(10 * time.Second) 121 + n.pdsBackoffMu.Unlock() 122 + return false, nil 123 + } 124 + return false, fmt.Errorf("failed to get repo: %w", err) 120 125 } 121 126 122 127 n.logger.Info("parsing repo CAR", "did", did, "size", len(repoBytes)) 123 128 124 129 commit, r, err := repo.LoadRepoFromCAR(ctx, bytes.NewReader(repoBytes)) 125 130 if err != nil { 126 - return fmt.Errorf("failed to read repo from CAR: %w", err) 131 + return false, fmt.Errorf("failed to read repo from CAR: %w", err) 127 132 } 128 133 129 134 rev := commit.Rev ··· 131 136 132 137 var existingRecords []models.RepoRecord 133 138 if err := n.db.Find(&existingRecords, "did = ?", did).Error; err != nil { 134 - return fmt.Errorf("failed to load existing records: %w", err) 139 + return false, fmt.Errorf("failed to load existing records: %w", err) 135 140 } 136 141 137 142 existingCids := make(map[string]string, len(existingRecords)) ··· 204 209 }) 205 210 206 211 if err != nil { 207 - return fmt.Errorf("failed to iterate repo: %w", err) 212 + return false, fmt.Errorf("failed to iterate repo: %w", err) 208 213 } 209 214 210 215 if err := n.writeBatch(evtBatch); err != nil { 211 - return fmt.Errorf("failed to flush final batch: %w", err) 216 + return false, fmt.Errorf("failed to flush final batch: %w", err) 212 217 } 213 218 214 219 if err := n.db.Model(&models.Repo{}). ··· 219 224 "prev_data": commit.Data.String(), 220 225 "error_msg": "", 221 226 }).Error; err != nil { 222 - return fmt.Errorf("failed to update repo state to active %w", err) 227 + return false, fmt.Errorf("failed to update repo state to active %w", err) 223 228 } 224 229 225 230 n.logger.Info("resync repo complete", "did", did, "rev", rev) 226 - return nil 231 + return true, nil 227 232 } 228 233 229 234 func (n *Nexus) writeBatch(evtBatch []*RecordEvt) error { ··· 261 266 262 267 n.outbox.Notify() 263 268 return nil 269 + } 270 + 271 + func (n *Nexus) handleResyncError(did string, err error) error { 272 + var state models.RepoState 273 + var errMsg string 274 + if err == nil { 275 + state = models.RepoStateDesynced 276 + errMsg = "" 277 + } else { 278 + state = models.RepoStateError 279 + errMsg = err.Error() 280 + } 281 + 282 + repo, err := n.EventProcessor.GetRepoState(did) 283 + if err != nil { 284 + return err 285 + } 286 + 287 + // start a 1 min & go up to 1 hr between retries 288 + retryAfter := time.Now().Add(60 * backoff(repo.RetryCount, 60)) 289 + 290 + dbErr := n.db.Model(&models.Repo{}). 291 + Where("did = ?", did). 292 + Updates(map[string]interface{}{ 293 + "state": state, 294 + "error_msg": errMsg, 295 + "retry_count": repo.RetryCount + 1, 296 + "retry_after": retryAfter, 297 + }).Error 298 + if dbErr != nil { 299 + return dbErr 300 + } else { 301 + return err 302 + } 303 + 264 304 } 265 305 266 306 func (n *Nexus) resetPartiallyResynced() error {
+26 -1
nexus/util.go
··· 1 1 package main 2 2 3 - import "strings" 3 + import ( 4 + "errors" 5 + "math/rand" 6 + "strings" 7 + "time" 8 + 9 + "github.com/bluesky-social/indigo/xrpc" 10 + ) 11 + 12 + func backoff(retries int, max int) time.Duration { 13 + dur := 1 << retries 14 + if dur > max { 15 + dur = max 16 + } 17 + 18 + jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 19 + return time.Second*time.Duration(dur) + jitter 20 + } 4 21 5 22 // matchesCollection checks if a collection matches any of the provided filters. 6 23 // Filters support wildcards at the end (e.g., "app.bsky.*" & "app.bsky.feed.*" both match "app.bsky.feed.post"). ··· 25 42 26 43 return false 27 44 } 45 + 46 + func isRateLimitError(err error) bool { 47 + var xrpcErr *xrpc.Error 48 + if errors.As(err, &xrpcErr) { 49 + return xrpcErr.IsThrottled() 50 + } 51 + return false 52 + }