this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

move host-per-day limiter from slurper to relay

+59 -33
+2 -2
cmd/relay/handlers_admin.go
··· 48 48 49 49 func (s *Service) handleAdminGetNewHostPerDayRateLimit(c echo.Context) error { 50 50 return c.JSON(http.StatusOK, map[string]int64{ 51 - "limit": s.relay.Slurper.NewHostPerDayLimiter.Limit(), 51 + "limit": s.relay.HostPerDayLimiter.Limit(), 52 52 }) 53 53 } 54 54 ··· 58 58 return &echo.HTTPError{Code: http.StatusBadRequest, Message: fmt.Errorf("failed to parse limit: %w", err).Error()} 59 59 } 60 60 61 - s.relay.Slurper.NewHostPerDayLimiter.SetLimit(limit) 61 + s.relay.HostPerDayLimiter.SetLimit(limit) 62 62 63 63 // TODO: forward to SiblingRelayHosts 64 64 return c.JSON(http.StatusOK, map[string]any{
+1
cmd/relay/main.go
··· 233 233 } 234 234 235 235 relayConfig := relay.DefaultRelayConfig() 236 + relayConfig.UserAgent = fmt.Sprintf("indigo-relay/%s", versioninfo.Short()) 236 237 relayConfig.ConcurrencyPerHost = cctx.Int("host-concurrency") 237 238 relayConfig.QueueDepthPerHost = cctx.Int("host-queue-depth") 238 239 relayConfig.DefaultRepoLimit = cctx.Int64("default-account-limit")
+1 -1
cmd/relay/relay/crawl.go
··· 12 12 // Must be called with the slurper lock held 13 13 func (r *Relay) canSlurpHost(hostname string) bool { 14 14 // Check if we're over the limit for new hosts today 15 - if !r.Slurper.NewHostPerDayLimiter.Allow() { 15 + if !r.HostPerDayLimiter.Allow() { 16 16 return false 17 17 } 18 18
+16 -8
cmd/relay/relay/models/models.go
··· 22 22 ) 23 23 24 24 type Host struct { 25 - ID uint64 `gorm:"column:id;primarykey"` 25 + ID uint64 `gorm:"column:id;primarykey"` 26 + 27 + // these fields are automatically managed by gorm (by convention) 26 28 CreatedAt time.Time 27 29 UpdatedAt time.Time 28 30 ··· 37 39 38 40 // TODO: ThrottleUntil time.Time 39 41 40 - // indicates this is a highly trusted host (PDS) and different limits apply 42 + // indicates this is a highly trusted host (PDS), and different rate limits apply 41 43 Trusted bool `gorm:"column:trusted;default:false"` 42 44 43 - // enum of account status 44 45 Status HostStatus `gorm:"column:status;default:active"` 45 46 46 - // negative number indicates no sequence recorded 47 - LastSeq int64 `gorm:"column:last_seq"` 48 - AccountCount int64 `gorm:"column:account_count"` 47 + // the last sequence number persisted for this host. updated periodically, and at shutdown. negative number indicates no sequence recorded 48 + LastSeq int64 `gorm:"column:last_seq;default:-1"` 49 + 50 + // represents the number of accounts on the host, minus any in "deleted" state 51 + AccountCount int64 `gorm:"column:account_count;default:0"` 49 52 } 50 53 51 54 func (Host) TableName() string { ··· 71 74 ) 72 75 73 76 type Account struct { 74 - UID uint64 `gorm:"column:uid;primarykey"` 75 - DID string `gorm:"column:did;uniqueIndex;not null"` 77 + UID uint64 `gorm:"column:uid;primarykey"` 78 + DID string `gorm:"column:did;uniqueIndex;not null"` 79 + 80 + // this is a reference to the ID field on Host; but it is not an explicit foreign key 76 81 HostID uint64 `gorm:"column:host_id;not null"` 77 82 Status AccountStatus `gorm:"column:status;default:active"` 78 83 UpstreamStatus AccountStatus `gorm:"column:upstream_status;default:active"` ··· 85 90 86 91 // This is a small extension table to `Account`, which holds fast-changing fields updated on every firehose event. 87 92 type AccountRepo struct { 93 + // references Account.UID, but not set up as a foreign key 88 94 UID uint64 `gorm:"column:uid;primarykey"` 89 95 Rev string `gorm:"column:rev;not null"` 96 + 90 97 // The CID of the entire signed commit block. Sometimes called the "head" 91 98 CommitCID string `gorm:"column:commit_cid;not null"` 99 + 92 100 // The CID of the top of the repo MST, which is the 'data' field within the commit block. This becomes 'prevData' 93 101 CommitData string `gorm:"column:commit_data;not null"` 94 102 }
+12 -6
cmd/relay/relay/relay.go
··· 8 8 "github.com/bluesky-social/indigo/cmd/relay/relay/models" 9 9 "github.com/bluesky-social/indigo/cmd/relay/stream/eventmgr" 10 10 11 + "github.com/RussellLuo/slidingwindow" 11 12 "github.com/hashicorp/golang-lru/v2" 12 13 "go.opentelemetry.io/otel" 13 14 "gorm.io/gorm" ··· 24 25 HostChecker HostChecker 25 26 Config RelayConfig 26 27 27 - // accountLk serializes a section of syncHostAccount() 28 - // TODO: at some point we will want to lock specific DIDs, this lock as is 29 - // is overly broad, but i dont expect it to be a bottleneck for now 30 - accountLk sync.Mutex 31 - 32 28 // Management of Socket Consumers 33 29 consumersLk sync.RWMutex 34 30 nextConsumerID uint64 ··· 36 32 37 33 // Account cache 38 34 accountCache *lru.Cache[string, *models.Account] 35 + 36 + HostPerDayLimiter *slidingwindow.Limiter 39 37 } 40 38 41 39 type RelayConfig struct { 40 + UserAgent string 42 41 DefaultRepoLimit int64 43 42 ConcurrencyPerHost int 44 43 QueueDepthPerHost int 45 44 LenientSyncValidation bool 46 45 TrustedDomains []string 46 + HostPerDayLimit int64 47 47 48 48 // If true, skip validation that messages for a given account (DID) are coming from the expected upstream host (PDS). Currently only used in tests; might be used for intermediate relays in the future. 49 49 SkipAccountHostCheck bool ··· 52 52 func DefaultRelayConfig() *RelayConfig { 53 53 // NOTE: many of these defaults are clobbered by CLI arguments 54 54 return &RelayConfig{ 55 + UserAgent: "indigo-relay", 55 56 DefaultRepoLimit: 100, 56 57 ConcurrencyPerHost: 40, 57 58 QueueDepthPerHost: 1000, 59 + HostPerDayLimit: 50, 58 60 } 59 61 } 60 62 ··· 66 68 67 69 uc, _ := lru.New[string, *models.Account](2_000_000) 68 70 69 - hc := NewHostClient("relay") // TODO: pass-through a user-agent from config? 71 + hc := NewHostClient(config.UserAgent) 72 + 73 + // NOTE: discarded second argument is not an `error` type 70 74 71 75 r := &Relay{ 72 76 db: db, ··· 80 84 consumers: make(map[uint64]*SocketConsumer), 81 85 82 86 accountCache: uc, 87 + 88 + HostPerDayLimiter: perDayLimiter(config.HostPerDayLimit), 83 89 } 84 90 85 91 if err := r.MigrateDatabase(); err != nil {
+13 -16
cmd/relay/relay/slurper.go
··· 6 6 "fmt" 7 7 "log/slog" 8 8 "math/rand" 9 + "net/http" 9 10 "sync" 10 11 "sync/atomic" 11 12 "time" ··· 39 40 LimitMtx sync.RWMutex 40 41 Limiters map[uint64]*Limiters 41 42 42 - NewHostPerDayLimiter *slidingwindow.Limiter 43 - 44 43 shutdownChan chan bool 45 44 shutdownResult chan error 46 45 ··· 54 53 } 55 54 56 55 type SlurperConfig struct { 56 + UserAgent string 57 57 DefaultPerSecondLimit int64 58 58 DefaultPerHourLimit int64 59 59 DefaultPerDayLimit int64 60 60 DefaultRepoLimit int64 61 61 ConcurrencyPerHost int 62 62 QueueDepthPerHost int 63 - NewHostPerDayLimit int64 64 63 PersistCursorPeriod time.Duration 65 64 PersistCursorCallback PersistCursorFunc 66 65 PersistHostStatusCallback PersistHostStatusFunc ··· 69 68 func DefaultSlurperConfig() *SlurperConfig { 70 69 // NOTE: many of these defaults are overruled by DefaultRelayConfig, or even process CLI arg defaults 71 70 return &SlurperConfig{ 72 - NewHostPerDayLimit: 50, 71 + UserAgent: "indigo-relay", 73 72 DefaultPerSecondLimit: 50, 74 73 DefaultPerHourLimit: 2500, 75 74 DefaultPerDayLimit: 20_000, ··· 113 112 logger = slog.Default() 114 113 } 115 114 116 - // NOTE: discarded second argument is not an `error` type 117 - newHostPerDayLimiter, _ := slidingwindow.NewLimiter(time.Hour*24, config.NewHostPerDayLimit, windowFunc) 118 - 119 115 s := &Slurper{ 120 - processCallback: processCallback, 121 - Config: config, 122 - subs: make(map[string]*Subscription), 123 - Limiters: make(map[uint64]*Limiters), 124 - shutdownChan: make(chan bool), 125 - shutdownResult: make(chan error), 126 - NewHostPerDayLimiter: newHostPerDayLimiter, 127 - logger: logger, 116 + processCallback: processCallback, 117 + Config: config, 118 + subs: make(map[string]*Subscription), 119 + Limiters: make(map[uint64]*Limiters), 120 + shutdownChan: make(chan bool), 121 + shutdownResult: make(chan error), 122 + logger: logger, 128 123 } 129 124 130 125 // Start a goroutine to persist cursors (both periodically and and on shutdown) ··· 274 269 if newHost { 275 270 u = fmt.Sprintf("%s?cursor=%d", u, cursor) 276 271 } 277 - conn, res, err := d.DialContext(ctx, u, nil) 272 + hdr := make(http.Header) 273 + hdr.Add("User-Agent", s.Config.UserAgent) 274 + conn, res, err := d.DialContext(ctx, u, hdr) 278 275 if err != nil { 279 276 s.logger.Warn("dialing failed", "host", host.Hostname, "err", err, "backoff", backoff) 280 277 time.Sleep(sleepForBackoff(backoff))
+14
cmd/relay/relay/util.go
··· 1 + package relay 2 + 3 + import ( 4 + "time" 5 + 6 + "github.com/RussellLuo/slidingwindow" 7 + ) 8 + 9 + func perDayLimiter(count int64) *slidingwindow.Limiter { 10 + lim, _ := slidingwindow.NewLimiter(time.Hour*24, count, func() (slidingwindow.Window, slidingwindow.StopFunc) { 11 + return slidingwindow.NewLocalWindow() 12 + }) 13 + return lim 14 + }