this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

wire through admin rate-limit controls

+83 -45
+29 -32
cmd/relay/handlers_admin.go
··· 246 246 } 247 247 hostInfos[i].EventsSeenSinceStartup = uint64(m.Counter.GetValue()) 248 248 249 - /* TODO: compute these from account limit 250 - hostInfos[i].PerSecondEventRate = rateLimit{ 251 - Max: p.RateLimit, 252 - WindowSeconds: 1, 253 - } 254 - hostInfos[i].PerHourEventRate = rateLimit{ 255 - Max: float64(p.HourlyEventLimit), 256 - WindowSeconds: 3600, 249 + if isActive { 250 + slc, err := s.relay.Slurper.GetLimits(host.Hostname) 251 + if err != nil { 252 + hostInfos[i].PerSecondEventRate = rateLimit{ 253 + Max: float64(slc.PerSecond), 254 + WindowSeconds: 1, 255 + } 256 + hostInfos[i].PerHourEventRate = rateLimit{ 257 + Max: float64(slc.PerHour), 258 + WindowSeconds: 3600, 259 + } 260 + hostInfos[i].PerDayEventRate = rateLimit{ 261 + Max: float64(slc.PerDay), 262 + WindowSeconds: 86400, 263 + } 264 + } 257 265 } 258 - hostInfos[i].PerDayEventRate = rateLimit{ 259 - Max: float64(p.DailyEventLimit), 260 - WindowSeconds: 86400, 261 - } 262 - */ 263 266 } 264 267 265 268 return c.JSON(http.StatusOK, hostInfos) ··· 424 427 } 425 428 426 429 type RateLimitChangeRequest struct { 427 - Host string `json:"host"` 430 + Hostname string `json:"host"` 428 431 RepoLimit int64 `json:"repo_limit,omitempty"` 429 432 } 430 433 431 - /* XXX: finish rate limit stuff 432 434 func (s *Service) handleAdminChangeHostRateLimits(c echo.Context) error { 435 + ctx := c.Request().Context() 436 + 433 437 var body RateLimitChangeRequest 434 438 if err := c.Bind(&body); err != nil { 435 439 return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid body: %s", err)) 436 440 } 437 441 438 - var host models.Host 439 - if err := s.db.Where("host = ?", body.Host).First(&host).Error; err != nil { 440 - return err 442 + hostname, _, err := relay.ParseHostname(body.Hostname) 443 + if err != nil { 444 + return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid hostname: %s", err)) 441 445 } 442 446 443 - // Update the rate limits in the DB 444 - host.RateLimit = float64(body.PerSecond) 445 - host.HourlyEventLimit = body.PerHour 446 - host.DailyEventLimit = body.PerDay 447 - host.RepoLimit = body.RepoLimit 448 - 449 - if err := s.db.Save(&host).Error; err != nil { 450 - return echo.NewHTTPError(http.StatusInternalServerError, fmt.Errorf("failed to save rate limit changes: %w", err)) 447 + host, err := s.relay.GetHost(ctx, hostname) 448 + if err != nil { 449 + // TODO: technically, there could be a database error here or something 450 + return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("unknown hostname: %s", err)) 451 451 } 452 452 453 - // Update the rate limit in the limiter 454 - limits := s.relay.Slurper.GetOrCreateLimiters(host.ID, body.PerSecond, body.PerHour, body.PerDay) 455 - limits.PerSecond.SetLimit(body.PerSecond) 456 - limits.PerHour.SetLimit(body.PerHour) 457 - limits.PerDay.SetLimit(body.PerDay) 453 + if err := s.relay.UpdateHostAccountLimit(ctx, host.ID, body.RepoLimit); err != nil { 454 + return echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to update limits: %s", err)) 455 + } 458 456 459 457 return c.JSON(http.StatusOK, map[string]any{ 460 458 "success": "true", 461 459 }) 462 460 } 463 - */
+12 -8
cmd/relay/relay/account.go
··· 55 55 // Attempts creation of a new account associated with the given host, presumably because the account was discovered on that host's stream. 56 56 // 57 57 // If the account's identity doesn't match the host, this will fail. We only create accounts associated with hosts we already know of, not remote hosts (aka, no spidering). 58 - func (r *Relay) CreateHostAccount(ctx context.Context, did syntax.DID, hostID uint64, hostname string) (*models.Account, error) { 58 + func (r *Relay) CreateAccountHost(ctx context.Context, did syntax.DID, hostID uint64, hostname string) (*models.Account, error) { 59 59 // NOTE: this method doesn't use locking. the database UNIQUE constraint should prevent duplicate account creation. 60 60 logger := r.Logger.With("did", did, "hostname", hostname) 61 61 62 - //newUsersDiscovered.Inc() 62 + newUsersDiscovered.Inc() 63 63 //start := time.Now() 64 64 65 65 ident, err := r.Dir.LookupDID(ctx, did) ··· 83 83 } 84 84 } 85 85 86 - // TODO: fetch the full host, apply throttling or rate-limits? 86 + // TODO: could be verifying upstream status here (using r.HostChecker); not particularly urgent because triggering event is already coming from the relevant host 87 87 88 - // TODO: could be verifying upstream status here (using r.HostChecker) 89 - // XXX: limits/throttling; reach in to Slurper? 90 88 acc := models.Account{ 91 89 DID: did.String(), 92 90 HostID: hostID, 93 91 Status: models.AccountStatusActive, 94 92 UpstreamStatus: models.AccountStatusActive, 93 + } 94 + 95 + host, err := r.GetHostByID(ctx, hostID) 96 + if err != nil { 97 + return nil, err 98 + } 99 + if host.AccountCount >= host.AccountLimit { 100 + acc.Status = models.AccountStatusHostThrottled 95 101 } 96 102 97 103 // create Account row and increment host count in the same transaction ··· 147 153 } 148 154 } 149 155 150 - // TODO: could check upstream status here (using r.HostChecker) 151 - // TODO: for example, a moved account might go from takendown to active 152 - // XXX: limits/throttling; read in to Slurper? 156 + // TODO: check new upstream status here (using r.HostChecker). In particular, a moved account might go from takendown to active 153 157 154 158 // create Account row and increment host count in the same transaction 155 159 err = r.db.Transaction(func(tx *gorm.DB) error {
+1 -1
cmd/relay/relay/domain_ban.go
··· 11 11 "gorm.io/gorm" 12 12 ) 13 13 14 - // XXX: tests for domain ban logic (which hit an actual database) 14 + // TODO: tests for domain ban logic (which hit an actual database) 15 15 16 16 // DomainIsBanned checks if the given hostname is banned. It checks all domain suffixs. 17 17 //
+7
cmd/relay/relay/host.go
··· 69 69 70 70 func (r *Relay) UpdateHostAccountLimit(ctx context.Context, hostID uint64, accountLimit int64) error { 71 71 72 + if accountLimit < 0 { 73 + return fmt.Errorf("negative account limit") 74 + } 75 + 72 76 host, err := r.GetHostByID(ctx, hostID) 73 77 if err != nil { 74 78 return err 75 79 } 76 80 81 + // TODO: manage accounts marked as "host-throttled" when host-level account limits change (all in a transaction) 82 + // If limit increased: potentially mark some "host-throttled" accounts as "active" (ordered by UID ascending) 83 + // If limit decreased: potentially mark some "active" accounts as "host-throttled" (ordered by UID descending?) 77 84 if err := r.db.Model(models.Host{}).Where("id = ?", hostID).Update("account_limit", accountLimit).Error; err != nil { 78 85 return err 79 86 }
+4 -4
cmd/relay/relay/ingest.go
··· 75 75 return nil, nil, fmt.Errorf("fetching account: %w", err) 76 76 } 77 77 78 - acc, err = r.CreateHostAccount(ctx, did, hostID, hostname) 78 + acc, err = r.CreateAccountHost(ctx, did, hostID, hostname) 79 79 if err != nil { 80 80 return nil, nil, err 81 81 } ··· 83 83 84 84 if acc == nil { 85 85 // TODO: this is defensive and could be removed 86 - panic(ErrAccountNotFound) 86 + return nil, nil, ErrAccountNotFound 87 87 } 88 88 89 89 // verify that the account is on the subscribed host (or update if it should be) ··· 118 118 } 119 119 120 120 if ident == nil { 121 - // XXX: what to do if identity resolution fails 121 + // TODO: what to do if identity resolution fails 122 122 } 123 123 124 124 prevRepo, err := r.GetAccountRepo(ctx, acc.UID) ··· 168 168 } 169 169 170 170 if ident == nil { 171 - // XXX: what to do if identity resolution fails 171 + // TODO: what to do if identity resolution fails 172 172 } 173 173 174 174 newRepo, err := r.VerifyRepoSync(ctx, evt, ident, hostname)
+3
cmd/relay/relay/models/methods.go
··· 24 24 25 25 func (a *Account) AccountStatus() AccountStatus { 26 26 if a.Status != AccountStatusActive { 27 + if a.Status == AccountStatusHostThrottled { 28 + return AccountStatusThrottled 29 + } 27 30 return a.Status 28 31 } 29 32 return a.UpstreamStatus
+27
cmd/relay/relay/slurper.go
··· 49 49 PerDay *slidingwindow.Limiter 50 50 } 51 51 52 + type StreamLimiterCounts struct { 53 + PerSecond int64 54 + PerHour int64 55 + PerDay int64 56 + } 57 + 58 + func (sl *StreamLimiters) Counts() StreamLimiterCounts { 59 + return StreamLimiterCounts{ 60 + PerSecond: sl.PerSecond.Limit(), 61 + PerHour: sl.PerHour.Limit(), 62 + PerDay: sl.PerDay.Limit(), 63 + } 64 + } 65 + 52 66 type SlurperConfig struct { 53 67 UserAgent string 54 68 ConcurrencyPerHost int ··· 192 206 sub.Limiters.PerDay.SetLimit(newLims.PerDay.Limit()) 193 207 194 208 return nil 209 + } 210 + 211 + func (s *Slurper) GetLimits(hostname string) (*StreamLimiterCounts, error) { 212 + s.subsLk.Lock() 213 + defer s.subsLk.Unlock() 214 + 215 + sub, ok := s.subs[hostname] 216 + if !ok { 217 + return nil, fmt.Errorf("reading limits for %s: %w", hostname, ErrNoActiveConnection) 218 + } 219 + 220 + slc := sub.Limiters.Counts() 221 + return &slc, nil 195 222 } 196 223 197 224 // Shutdown shuts down the slurper