this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

cleanup rate-limit code

+94 -77
+26 -27
cmd/relay/handlers_admin.go
··· 179 179 180 180 type hostInfo struct { 181 181 // fields from old models.PDS 182 - ID uint64 183 - CreatedAt time.Time 184 - Host string 185 - SSL bool 186 - Cursor int64 187 - Registered bool 188 - Blocked bool 189 - RateLimit float64 190 - CrawlRateLimit float64 191 - RepoCount int64 192 - RepoLimit int64 193 - HourlyEventLimit int64 194 - DailyEventLimit int64 182 + ID uint64 183 + CreatedAt time.Time 184 + Host string 185 + SSL bool 186 + Cursor int64 187 + Registered bool 188 + Blocked bool 189 + CrawlRateLimit float64 190 + RepoCount int64 191 + RepoLimit int64 195 192 196 193 HasActiveConnection bool `json:"HasActiveConnection"` 197 194 EventsSeenSinceStartup uint64 `json:"EventsSeenSinceStartup"` ··· 227 224 Cursor: host.LastSeq, 228 225 Registered: host.Status == models.HostStatusActive, // is this right? 229 226 Blocked: host.Status == models.HostStatusBanned, 230 - //TODO: RateLimit 231 - //TODO: CrawlRateLimit 232 - RepoCount: host.AccountCount, 233 - RepoLimit: host.AccountLimit, 234 - //HourlyEventLimit 235 - //DailyEventLimit 227 + RepoCount: host.AccountCount, 228 + RepoLimit: host.AccountLimit, 236 229 237 230 HasActiveConnection: isActive, 238 231 UserCount: host.AccountCount, 239 232 } 240 233 241 - // pull event counter metrics from prometheus 242 - var m = &dto.Metric{} 243 - if err := relay.EventsReceivedCounter.WithLabelValues(host.Hostname).Write(m); err != nil { 244 - hostInfos[i].EventsSeenSinceStartup = 0 245 - continue 246 - } 247 - hostInfos[i].EventsSeenSinceStartup = uint64(m.Counter.GetValue()) 248 - 234 + // fetch current rate limits 235 + hostInfos[i].PerSecondEventRate = rateLimit{Max: -1.0, WindowSeconds: 1} 236 + hostInfos[i].PerHourEventRate = rateLimit{Max: -1.0, WindowSeconds: 3600} 237 + hostInfos[i].PerDayEventRate = rateLimit{Max: -1.0, WindowSeconds: 86400} 249 238 if isActive { 250 239 slc, err := s.relay.Slurper.GetLimits(host.Hostname) 251 240 if err != nil { 241 + s.logger.Error("fetching subscribed host limits", "err", err) 242 + } else { 252 243 hostInfos[i].PerSecondEventRate = rateLimit{ 253 244 Max: float64(slc.PerSecond), 254 245 WindowSeconds: 1, ··· 263 254 } 264 255 } 265 256 } 257 + 258 + // pull event counter metrics from prometheus 259 + var m = &dto.Metric{} 260 + if err := relay.EventsReceivedCounter.WithLabelValues(host.Hostname).Write(m); err != nil { 261 + hostInfos[i].EventsSeenSinceStartup = 0 262 + continue 263 + } 264 + hostInfos[i].EventsSeenSinceStartup = uint64(m.Counter.GetValue()) 266 265 } 267 266 268 267 return c.JSON(http.StatusOK, hostInfos)
+68 -50
cmd/relay/relay/slurper.go
··· 27 27 type PersistCursorFunc func(ctx context.Context, cursors *[]HostCursor) error 28 28 type PersistHostStatusFunc func(ctx context.Context, hostID uint64, state models.HostStatus) error 29 29 30 - // `Slurper` is the sub-system of the relay which manages active websocket firehose connections to upstream hosts. 30 + // `Slurper` is the sub-system of the relay which manages active websocket firehose connections to upstream hosts (eg, PDS instances). 31 31 // 32 - // It enforces rate-limits, tracks cursors, and retries connections. It passes recieved messages on to the main relay via a callback function. `Slurper` does not talk to the database directly, but does have some callback to persist host state (cursors and hosting status for some error conditions). 32 + // It configures rate-limits, tracks cursors, and retries connections. It passes recieved messages on to the main relay via a callback function. `Slurper` does not talk to the database directly, but does have some callback to persist host state (cursors and hosting status for some error conditions). 33 33 type Slurper struct { 34 34 processCallback ProcessMessageFunc 35 35 Config *SlurperConfig ··· 41 41 shutdownResult chan error 42 42 43 43 logger *slog.Logger 44 - } 45 - 46 - type StreamLimiters struct { 47 - PerSecond *slidingwindow.Limiter 48 - PerHour *slidingwindow.Limiter 49 - PerDay *slidingwindow.Limiter 50 - } 51 - 52 - type StreamLimiterCounts struct { 53 - PerSecond int64 54 - PerHour int64 55 - PerDay int64 56 - } 57 - 58 - func (sl *StreamLimiters) Counts() StreamLimiterCounts { 59 - return StreamLimiterCounts{ 60 - PerSecond: sl.PerSecond.Limit(), 61 - PerHour: sl.PerHour.Limit(), 62 - PerDay: sl.PerDay.Limit(), 63 - } 64 44 } 65 45 66 46 type SlurperConfig struct { ··· 126 106 } 127 107 } 128 108 109 + type StreamLimiterCounts struct { 110 + PerSecond int64 111 + PerHour int64 112 + PerDay int64 113 + } 114 + 115 + type StreamLimiters struct { 116 + PerSecond *slidingwindow.Limiter 117 + PerHour *slidingwindow.Limiter 118 + PerDay *slidingwindow.Limiter 119 + } 120 + 121 + func (sl *StreamLimiters) Counts() StreamLimiterCounts { 122 + return StreamLimiterCounts{ 123 + PerSecond: sl.PerSecond.Limit(), 124 + PerHour: sl.PerHour.Limit(), 125 + PerDay: sl.PerDay.Limit(), 126 + } 127 + } 128 + 129 129 func NewSlurper(processCallback ProcessMessageFunc, config *SlurperConfig, logger *slog.Logger) (*Slurper, error) { 130 + if processCallback == nil { 131 + return nil, fmt.Errorf("processCallback is required") 132 + } 130 133 if config == nil { 131 134 config = DefaultSlurperConfig() 132 135 } ··· 166 169 return slidingwindow.NewLocalWindow() 167 170 } 168 171 169 - func (s *Slurper) NewStreamLimiters(accountLimit int64, trusted bool) *StreamLimiters { 170 - 171 - perSecondCount := s.Config.BaselinePerSecondLimit + (accountLimit / 1000) 172 - perHourCount := s.Config.BaselinePerHourLimit + accountLimit 173 - perDayCount := s.Config.BaselinePerDayLimit + accountLimit*10 174 - 172 + func (s *Slurper) ComputeLimiterCounts(accountLimit int64, trusted bool) StreamLimiterCounts { 175 173 if trusted { 176 - perSecondCount = s.Config.TrustedPerSecondLimit 177 - perHourCount = s.Config.TrustedPerHourLimit 178 - perDayCount = s.Config.TrustedPerDayLimit 174 + return StreamLimiterCounts{ 175 + PerSecond: s.Config.TrustedPerSecondLimit, 176 + PerHour: s.Config.TrustedPerHourLimit, 177 + PerDay: s.Config.TrustedPerDayLimit, 178 + } 179 179 } 180 - 181 - perSec, _ := slidingwindow.NewLimiter(time.Second, perSecondCount, windowFunc) 182 - perHour, _ := slidingwindow.NewLimiter(time.Hour, perHourCount, windowFunc) 183 - perDay, _ := slidingwindow.NewLimiter(time.Hour*24, perDayCount, windowFunc) 184 - return &StreamLimiters{ 185 - PerSecond: perSec, 186 - PerHour: perHour, 187 - PerDay: perDay, 180 + return StreamLimiterCounts{ 181 + PerSecond: s.Config.BaselinePerSecondLimit + (accountLimit / 1000), 182 + PerHour: s.Config.BaselinePerHourLimit + accountLimit, 183 + PerDay: s.Config.BaselinePerDayLimit + accountLimit*10, 188 184 } 189 185 } 190 186 191 187 func (s *Slurper) UpdateLimiters(hostname string, accountLimit int64, trusted bool) error { 192 188 193 - // easiest way to re-compute is generate a new set of limiters 194 - newLims := s.NewStreamLimiters(accountLimit, trusted) 189 + newLims := s.ComputeLimiterCounts(accountLimit, trusted) 195 190 196 191 s.subsLk.Lock() 197 192 defer s.subsLk.Unlock() ··· 201 196 return fmt.Errorf("updating limits for %s: %w", hostname, ErrNoActiveConnection) 202 197 } 203 198 204 - sub.Limiters.PerSecond.SetLimit(newLims.PerSecond.Limit()) 205 - sub.Limiters.PerHour.SetLimit(newLims.PerHour.Limit()) 206 - sub.Limiters.PerDay.SetLimit(newLims.PerDay.Limit()) 199 + sub.Limiters.PerSecond.SetLimit(newLims.PerSecond) 200 + sub.Limiters.PerHour.SetLimit(newLims.PerHour) 201 + sub.Limiters.PerDay.SetLimit(newLims.PerDay) 207 202 208 203 return nil 209 204 } ··· 221 216 return &slc, nil 222 217 } 223 218 224 - // Shutdown shuts down the slurper 219 + // Shutdown shuts down the entire Slurper (all subscriptions) 225 220 func (s *Slurper) Shutdown() error { 226 221 s.shutdownChan <- true 227 222 s.logger.Info("waiting for slurper shutdown") ··· 241 236 return ok 242 237 } 243 238 239 + // high-level entry point for opening a subscription (websocket connection). This might be called when adding a new host, or when re-connecting to a previously subscribed host. 240 + // 241 + // NOTE: the `host` parameter (a database row) contains metadata about the host at a point in time. Subsequent changes to the database aren't reflected in that struct, and changes to the struct don't get persisted to database. 244 242 func (s *Slurper) Subscribe(host *models.Host, newHost bool) error { 245 - // TODO: for performance, lock on the hostname instead of global 243 + // TODO: replace newHost with a check for negative number on host.LastSeq (via IsNewHost helper method on `models.Host`?) 246 244 s.subsLk.Lock() 247 245 defer s.subsLk.Unlock() 248 246 247 + _, ok := s.subs[host.Hostname] 248 + if ok { 249 + return fmt.Errorf("already subscribed: %s", host.Hostname) 250 + } 251 + 252 + counts := s.ComputeLimiterCounts(host.AccountLimit, host.Trusted) 253 + perSec, _ := slidingwindow.NewLimiter(time.Second, counts.PerSecond, windowFunc) 254 + perHour, _ := slidingwindow.NewLimiter(time.Hour, counts.PerHour, windowFunc) 255 + perDay, _ := slidingwindow.NewLimiter(time.Hour*24, counts.PerDay, windowFunc) 256 + limiters := &StreamLimiters{ 257 + PerSecond: perSec, 258 + PerHour: perHour, 259 + PerDay: perDay, 260 + } 261 + 249 262 ctx, cancel := context.WithCancel(context.Background()) 250 263 sub := Subscription{ 251 264 Hostname: host.Hostname, 252 265 HostID: host.ID, 253 - Limiters: s.NewStreamLimiters(host.AccountLimit, host.Trusted), 266 + Limiters: limiters, 254 267 ctx: ctx, 255 268 cancel: cancel, 256 269 } ··· 262 275 return nil 263 276 } 264 277 278 + // Main event-loop for a subscription (websocket connection to upstream host), expected to be called as a goroutine. 279 + // 280 + // On connection failure (drop or failed initial connection), will attempt re-connects, with backoff. 265 281 func (s *Slurper) subscribeWithRedialer(ctx context.Context, host *models.Host, sub *Subscription, newHost bool) { 266 282 defer func() { 267 283 s.subsLk.Lock() ··· 274 290 HandshakeTimeout: time.Second * 5, 275 291 } 276 292 277 - // cursor by 200 events to smooth over unclean shutdowns 293 + // HACK: cursor by 200 events to smooth over unclean shutdowns. This has been in place since 2024. 278 294 if host.LastSeq > 200 { 279 295 host.LastSeq -= 200 280 296 } else { ··· 285 301 286 302 connectedInbound.Inc() 287 303 defer connectedInbound.Dec() 288 - // TODO:? maybe keep a gauge of 'in retry backoff' sources? 304 + // TODO: add a metric for number of subscriptions which are attempting to reconnect 289 305 290 306 var backoff int 291 307 for { ··· 348 364 return time.Second * 30 349 365 } 350 366 367 + // Configures event processing for a websocket connection, using the parallel schedule helper library, with all events processed using the configured callback function. 351 368 func (s *Slurper) handleConnection(ctx context.Context, conn *websocket.Conn, lastCursor *int64, sub *Subscription) error { 352 369 ctx, cancel := context.WithCancel(ctx) 353 370 defer cancel() ··· 470 487 sub.Limiters.PerDay, 471 488 } 472 489 473 - // NOTE: this is where limiters get injected and enforced 490 + // NOTE: `InstrumentedRepoStreamCallbacks` is where event limiters get called/enforced 474 491 instrumentedRSC := stream.NewInstrumentedRepoStreamCallbacks(limiters, rsc.EventHandler) 475 492 476 493 pool := parallel.NewScheduler( ··· 513 530 return err 514 531 } 515 532 533 + // gets a snapshot of current subsription hostnames 516 534 func (s *Slurper) GetActiveSubHostnames() []string { 517 535 s.subsLk.Lock() 518 536 defer s.subsLk.Unlock()