this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

progress on rate-limiters

+106 -133
+2 -2
cmd/relay/handlers_admin.go
··· 424 424 } 425 425 426 426 type RateLimitChangeRequest struct { 427 - Host string `json:"host"` 428 - relay.HostRates 427 + Host string `json:"host"` 428 + RepoLimit int64 `json:"repo_limit,omitempty"` 429 429 } 430 430 431 431 /* XXX: finish rate limit stuff
+4 -2
cmd/relay/relay/account_test.go
··· 3 3 import ( 4 4 "testing" 5 5 6 + "github.com/bluesky-social/indigo/atproto/syntax" 7 + 6 8 "github.com/stretchr/testify/assert" 7 9 ) 8 10 ··· 14 16 func TestNormalizeDID(t *testing.T) { 15 17 assert := assert.New(t) 16 18 17 - fixtures := []HostnameFixture{ 19 + fixtures := []DIDFixture{ 18 20 DIDFixture{Val: "did:web:example.com", Norm: "did:web:example.com"}, 19 21 DIDFixture{Val: "did:web:example.com", Norm: "did:web:example.com"}, 20 22 DIDFixture{Val: "did:web:EXAMPLE.com", Norm: "did:web:example.com"}, ··· 23 25 } 24 26 25 27 for _, f := range fixtures { 26 - assert.Equal(f.Norm, NormalizeDID(f.Val)) 28 + assert.Equal(f.Norm, NormalizeDID(syntax.DID(f.Val)).String()) 27 29 } 28 30 }
+11 -32
cmd/relay/relay/crawl.go
··· 2 2 3 3 import ( 4 4 "fmt" 5 - "strings" 6 5 7 6 "github.com/bluesky-social/indigo/cmd/relay/relay/models" 8 7 ) 9 8 10 - // Checks whether a host is allowed to be subscribed to 11 - // 12 - // Must be called with the slurper lock held 13 - func (r *Relay) canSlurpHost(hostname string) bool { 14 - // Check if we're over the limit for new hosts today 15 - if !r.HostPerDayLimiter.Allow() { 16 - return false 17 - } 18 - 19 - // Check if the host is a trusted domain 20 - for _, d := range r.Config.TrustedDomains { 21 - // If the domain starts with a *., it's a wildcard 22 - if strings.HasPrefix(d, "*.") { 23 - // Cut off the * so we have .domain.com 24 - if strings.HasSuffix(hostname, strings.TrimPrefix(d, "*")) { 25 - return true 26 - } 27 - } else { 28 - if hostname == d { 29 - return true 30 - } 31 - } 32 - } 33 - 34 - return true 35 - } 36 - 37 9 func (r *Relay) SubscribeToHost(hostname string, noSSL, adminForce bool) error { 38 10 39 11 // if we already have an active subscription, exit early ··· 50 22 51 23 if host.ID == 0 { 52 24 newHost = true 53 - if !adminForce && !r.canSlurpHost(hostname) { 25 + 26 + // check if we're over the limit for new hosts today (bypass if admin mode) 27 + if !adminForce && !r.HostPerDayLimiter.Allow() { 54 28 // TODO: is this the correct error code? 55 29 return ErrNewSubsDisabled 56 30 } 57 31 58 - // XXX: new host daily rate-limit 32 + accountLimit := r.Config.DefaultRepoLimit 33 + trusted := IsTrustedHostname(hostname, r.Config.TrustedDomains) 34 + if trusted { 35 + accountLimit = r.Config.TrustedRepoLimit 36 + } 59 37 60 38 host = models.Host{ 61 39 Hostname: hostname, 62 40 NoSSL: noSSL, 63 41 Status: models.HostStatusActive, 64 - AccountLimit: r.Config.DefaultRepoLimit, 42 + Trusted: trusted, 43 + AccountLimit: accountLimit, 65 44 } 66 45 67 - if err := r.db.Create(&newHost).Error; err != nil { 46 + if err := r.db.Create(&host).Error; err != nil { 68 47 return err 69 48 } 70 49
+18
cmd/relay/relay/host.go
··· 67 67 return r.db.Model(models.Host{}).Where("id = ?", hostID).Update("status", status).Error 68 68 } 69 69 70 + func (r *Relay) UpdateHostAccountLimit(ctx context.Context, hostID uint64, accountLimit int64) error { 71 + 72 + host, err := r.GetHostByID(ctx, hostID) 73 + if err != nil { 74 + return err 75 + } 76 + 77 + if err := r.db.Model(models.Host{}).Where("id = ?", hostID).Update("account_limit", accountLimit).Error; err != nil { 78 + return err 79 + } 80 + 81 + if r.Slurper.CheckIfSubscribed(host.Hostname) { 82 + return r.Slurper.UpdateLimiters(host.Hostname, accountLimit, host.Trusted) 83 + } 84 + 85 + return nil 86 + } 87 + 70 88 // Persists all the host cursors in a single database transaction 71 89 // 72 90 // Note that in some situations this may have partial success.
+2 -5
cmd/relay/relay/models/models.go
··· 37 37 // maximum number of active accounts 38 38 AccountLimit int64 `gorm:"column:account_limit"` 39 39 40 - // TODO: ThrottleUntil time.Time 41 - 42 40 // indicates this is a highly trusted host (PDS), and different rate limits apply 43 41 Trusted bool `gorm:"column:trusted;default:false"` 44 42 ··· 79 77 80 78 // this is a reference to the ID field on Host; but it is not an explicit foreign key 81 79 HostID uint64 `gorm:"column:host_id;not null"` 82 - Status AccountStatus `gorm:"column:status;default:active"` 83 - UpstreamStatus AccountStatus `gorm:"column:upstream_status;default:active"` 84 - ThrottleUntil time.Time `gorm:"column:throttle_util"` 80 + Status AccountStatus `gorm:"column:status;not null;default:active"` 81 + UpstreamStatus AccountStatus `gorm:"column:upstream_status;not null;default:active"` 85 82 } 86 83 87 84 func (Account) TableName() string {
-25
cmd/relay/relay/rate_limits.go
··· 1 - package relay 2 - 3 - type HostRates struct { 4 - // core event rate, counts firehose events 5 - PerSecond int64 `json:"per_second,omitempty"` 6 - PerHour int64 `json:"per_hour,omitempty"` 7 - PerDay int64 `json:"per_day,omitempty"` 8 - 9 - RepoLimit int64 `json:"repo_limit,omitempty"` 10 - } 11 - 12 - func (pr *HostRates) FromSlurper(s *Slurper) { 13 - if pr.PerSecond == 0 { 14 - pr.PerHour = s.Config.DefaultPerSecondLimit 15 - } 16 - if pr.PerHour == 0 { 17 - pr.PerHour = s.Config.DefaultPerHourLimit 18 - } 19 - if pr.PerDay == 0 { 20 - pr.PerDay = s.Config.DefaultPerDayLimit 21 - } 22 - if pr.RepoLimit == 0 { 23 - pr.RepoLimit = s.Config.DefaultRepoLimit 24 - } 25 - }
+2 -1
cmd/relay/relay/relay.go
··· 39 39 type RelayConfig struct { 40 40 UserAgent string 41 41 DefaultRepoLimit int64 42 + TrustedRepoLimit int64 42 43 ConcurrencyPerHost int 43 44 QueueDepthPerHost int 44 45 LenientSyncValidation bool ··· 54 55 return &RelayConfig{ 55 56 UserAgent: "indigo-relay", 56 57 DefaultRepoLimit: 100, 58 + TrustedRepoLimit: 10_000_000, 57 59 ConcurrencyPerHost: 40, 58 60 QueueDepthPerHost: 1000, 59 61 HostPerDayLimit: 50, ··· 93 95 } 94 96 95 97 slurpConfig := DefaultSlurperConfig() 96 - slurpConfig.DefaultRepoLimit = config.DefaultRepoLimit 97 98 slurpConfig.ConcurrencyPerHost = config.ConcurrencyPerHost 98 99 slurpConfig.QueueDepthPerHost = config.QueueDepthPerHost 99 100
+67 -66
cmd/relay/relay/slurper.go
··· 37 37 subsLk sync.Mutex 38 38 subs map[string]*Subscription 39 39 40 - LimitMtx sync.RWMutex 41 - Limiters map[uint64]*Limiters 42 - 43 40 shutdownChan chan bool 44 41 shutdownResult chan error 45 42 46 43 logger *slog.Logger 47 44 } 48 45 49 - type Limiters struct { 46 + type StreamLimiters struct { 50 47 PerSecond *slidingwindow.Limiter 51 48 PerHour *slidingwindow.Limiter 52 49 PerDay *slidingwindow.Limiter 53 50 } 54 51 55 52 type SlurperConfig struct { 56 - UserAgent string 57 - DefaultPerSecondLimit int64 58 - DefaultPerHourLimit int64 59 - DefaultPerDayLimit int64 60 - DefaultRepoLimit int64 61 - ConcurrencyPerHost int 62 - QueueDepthPerHost int 63 - PersistCursorPeriod time.Duration 53 + UserAgent string 54 + ConcurrencyPerHost int 55 + QueueDepthPerHost int 56 + PersistCursorPeriod time.Duration 57 + 58 + BaselinePerSecondLimit int64 59 + BaselinePerHourLimit int64 60 + BaselinePerDayLimit int64 61 + TrustedPerSecondLimit int64 62 + TrustedPerHourLimit int64 63 + TrustedPerDayLimit int64 64 + 65 + // callback functions. technically optional but effectively required 64 66 PersistCursorCallback PersistCursorFunc 65 67 PersistHostStatusCallback PersistHostStatusFunc 66 68 } ··· 68 70 func DefaultSlurperConfig() *SlurperConfig { 69 71 // NOTE: many of these defaults are overruled by DefaultRelayConfig, or even process CLI arg defaults 70 72 return &SlurperConfig{ 71 - UserAgent: "indigo-relay", 72 - DefaultPerSecondLimit: 50, 73 - DefaultPerHourLimit: 2500, 74 - DefaultPerDayLimit: 20_000, 75 - DefaultRepoLimit: 100, 76 - ConcurrencyPerHost: 40, 77 - QueueDepthPerHost: 1000, 78 - PersistCursorPeriod: time.Second * 10, 73 + UserAgent: "indigo-relay", 74 + ConcurrencyPerHost: 40, 75 + QueueDepthPerHost: 1000, 76 + PersistCursorPeriod: time.Second * 4, 77 + 78 + // these are the minimum event rates for regular public hosts 79 + BaselinePerSecondLimit: 50, 80 + BaselinePerHourLimit: 2500, 81 + BaselinePerDayLimit: 20_000, 82 + 83 + // these are the fixed event rates for trusted hosts (eg, same service provider as relay) 84 + TrustedPerSecondLimit: 5_000, 85 + TrustedPerHourLimit: 50_000_000, 86 + TrustedPerDayLimit: 500_000_000, 79 87 } 80 88 } 81 89 ··· 84 92 Hostname string 85 93 HostID uint64 86 94 LastSeq atomic.Int64 87 - Limiters *Limiters // XXX: is this used? or only the separate limiters on Slurper? 95 + Limiters *StreamLimiters 88 96 89 97 lk sync.RWMutex 90 98 ctx context.Context ··· 116 124 processCallback: processCallback, 117 125 Config: config, 118 126 subs: make(map[string]*Subscription), 119 - Limiters: make(map[uint64]*Limiters), 120 127 shutdownChan: make(chan bool), 121 128 shutdownResult: make(chan error), 122 129 logger: logger, ··· 145 152 return slidingwindow.NewLocalWindow() 146 153 } 147 154 148 - func (s *Slurper) GetLimiters(hostID uint64) *Limiters { 149 - s.LimitMtx.RLock() 150 - defer s.LimitMtx.RUnlock() 151 - return s.Limiters[hostID] 152 - } 155 + func (s *Slurper) NewStreamLimiters(accountLimit int64, trusted bool) *StreamLimiters { 153 156 154 - func (s *Slurper) GetOrCreateLimiters(hostID uint64) *Limiters { 155 - s.LimitMtx.RLock() 156 - defer s.LimitMtx.RUnlock() 157 - lim, ok := s.Limiters[hostID] 158 - if !ok { 159 - perSec, _ := slidingwindow.NewLimiter(time.Second, s.Config.DefaultPerSecondLimit, windowFunc) 160 - perHour, _ := slidingwindow.NewLimiter(time.Hour, s.Config.DefaultPerHourLimit, windowFunc) 161 - perDay, _ := slidingwindow.NewLimiter(time.Hour*24, s.Config.DefaultPerDayLimit, windowFunc) 162 - lim = &Limiters{ 163 - PerSecond: perSec, 164 - PerHour: perHour, 165 - PerDay: perDay, 166 - } 167 - s.Limiters[hostID] = lim 157 + perSecondCount := s.Config.BaselinePerSecondLimit + (accountLimit / 1000) 158 + perHourCount := s.Config.BaselinePerHourLimit + accountLimit 159 + perDayCount := s.Config.BaselinePerDayLimit + accountLimit*10 160 + 161 + if trusted { 162 + perSecondCount = s.Config.TrustedPerSecondLimit 163 + perHourCount = s.Config.TrustedPerHourLimit 164 + perDayCount = s.Config.TrustedPerDayLimit 168 165 } 169 166 170 - return lim 167 + perSec, _ := slidingwindow.NewLimiter(time.Second, perSecondCount, windowFunc) 168 + perHour, _ := slidingwindow.NewLimiter(time.Hour, perHourCount, windowFunc) 169 + perDay, _ := slidingwindow.NewLimiter(time.Hour*24, perDayCount, windowFunc) 170 + return &StreamLimiters{ 171 + PerSecond: perSec, 172 + PerHour: perHour, 173 + PerDay: perDay, 174 + } 171 175 } 172 176 173 - func (s *Slurper) SetLimits(hostID uint64, perSecLimit int64, perHourLimit int64, perDayLimit int64) { 174 - s.LimitMtx.Lock() 175 - defer s.LimitMtx.Unlock() 176 - lim, ok := s.Limiters[hostID] 177 + func (s *Slurper) UpdateLimiters(hostname string, accountLimit int64, trusted bool) error { 178 + 179 + // easiest way to re-compute is generate a new set of limiters 180 + newLims := s.NewStreamLimiters(accountLimit, trusted) 181 + 182 + s.subsLk.Lock() 183 + defer s.subsLk.Unlock() 184 + 185 + sub, ok := s.subs[hostname] 177 186 if !ok { 178 - perSec, _ := slidingwindow.NewLimiter(time.Second, perSecLimit, windowFunc) 179 - perHour, _ := slidingwindow.NewLimiter(time.Hour, perHourLimit, windowFunc) 180 - perDay, _ := slidingwindow.NewLimiter(time.Hour*24, perDayLimit, windowFunc) 181 - lim = &Limiters{ 182 - PerSecond: perSec, 183 - PerHour: perHour, 184 - PerDay: perDay, 185 - } 186 - s.Limiters[hostID] = lim 187 + return fmt.Errorf("updating limits for %s: %w", hostname, ErrNoActiveConnection) 187 188 } 188 189 189 - lim.PerSecond.SetLimit(perSecLimit) 190 - lim.PerHour.SetLimit(perHourLimit) 191 - lim.PerDay.SetLimit(perDayLimit) 190 + sub.Limiters.PerSecond.SetLimit(newLims.PerSecond.Limit()) 191 + sub.Limiters.PerHour.SetLimit(newLims.PerHour.Limit()) 192 + sub.Limiters.PerDay.SetLimit(newLims.PerDay.Limit()) 193 + 194 + return nil 192 195 } 193 196 194 197 // Shutdown shuts down the slurper ··· 220 223 sub := Subscription{ 221 224 Hostname: host.Hostname, 222 225 HostID: host.ID, 226 + Limiters: s.NewStreamLimiters(host.AccountLimit, host.Trusted), 223 227 ctx: ctx, 224 228 cancel: cancel, 225 229 } 230 + sub.LastSeq.Store(host.LastSeq) 226 231 s.subs[host.Hostname] = &sub 227 - 228 - s.GetOrCreateLimiters(host.ID) 229 232 230 233 go s.subscribeWithRedialer(ctx, host, &sub, newHost) 231 234 ··· 266 269 } 267 270 268 271 u := host.SubscribeReposURL() 269 - if newHost { 272 + if !newHost { 270 273 u = fmt.Sprintf("%s?cursor=%d", u, cursor) 271 274 } 272 275 hdr := make(http.Header) ··· 434 437 }, 435 438 } 436 439 437 - lims := s.GetOrCreateLimiters(sub.HostID) 438 - 439 440 limiters := []*slidingwindow.Limiter{ 440 - lims.PerSecond, 441 - lims.PerHour, 442 - lims.PerDay, 441 + sub.Limiters.PerSecond, 442 + sub.Limiters.PerHour, 443 + sub.Limiters.PerDay, 443 444 } 444 445 445 446 // NOTE: this is where limiters get injected and enforced