this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

remove dynamic relay config

+47 -178
+4 -4
cmd/relayered/relay/account.go
··· 174 174 // could check other things, a valid response is good enough for now 175 175 canonicalHost.Host = durl.Host 176 176 canonicalHost.SSL = (durl.Scheme == "https") 177 - canonicalHost.RateLimit = float64(r.Slurper.DefaultPerSecondLimit) 178 - canonicalHost.HourlyEventLimit = r.Slurper.DefaultPerHourLimit 179 - canonicalHost.DailyEventLimit = r.Slurper.DefaultPerDayLimit 180 - canonicalHost.RepoLimit = r.Slurper.DefaultRepoLimit 177 + canonicalHost.RateLimit = float64(r.Slurper.Config.DefaultPerSecondLimit) 178 + canonicalHost.HourlyEventLimit = r.Slurper.Config.DefaultPerHourLimit 179 + canonicalHost.DailyEventLimit = r.Slurper.Config.DefaultPerDayLimit 180 + canonicalHost.RepoLimit = r.Slurper.Config.DefaultRepoLimit 181 181 182 182 if r.Config.SSL && !canonicalHost.SSL { 183 183 return nil, fmt.Errorf("did references non-ssl PDS, this is disallowed in prod: %q %q", did, pdsRelay.URL)
+2 -3
cmd/relayered/relay/relay.go
··· 84 84 return nil, err 85 85 } 86 86 87 - slOpts := slurper.DefaultSlurperOptions() 87 + slOpts := slurper.DefaultSlurperConfig() 88 88 slOpts.SSL = config.SSL 89 89 slOpts.DefaultRepoLimit = config.DefaultRepoLimit 90 90 slOpts.ConcurrencyPerPDS = config.ConcurrencyPerPDS 91 91 slOpts.MaxQueuePerPDS = config.MaxQueuePerPDS 92 - slOpts.Logger = r.Logger 93 - s, err := slurper.NewSlurper(db, r.handleFedEvent, slOpts) 92 + s, err := slurper.NewSlurper(db, r.handleFedEvent, slOpts, r.Logger) 94 93 if err != nil { 95 94 return nil, err 96 95 }
+4 -4
cmd/relayered/relay/slurper/rate_limits.go
··· 11 11 12 12 func (pr *PDSRates) FromSlurper(s *Slurper) { 13 13 if pr.PerSecond == 0 { 14 - pr.PerHour = s.DefaultPerSecondLimit 14 + pr.PerHour = s.Config.DefaultPerSecondLimit 15 15 } 16 16 if pr.PerHour == 0 { 17 - pr.PerHour = s.DefaultPerHourLimit 17 + pr.PerHour = s.Config.DefaultPerHourLimit 18 18 } 19 19 if pr.PerDay == 0 { 20 - pr.PerDay = s.DefaultPerDayLimit 20 + pr.PerDay = s.Config.DefaultPerDayLimit 21 21 } 22 22 if pr.RepoLimit == 0 { 23 - pr.RepoLimit = s.DefaultRepoLimit 23 + pr.RepoLimit = s.Config.DefaultRepoLimit 24 24 } 25 25 }
+37 -167
cmd/relayered/relay/slurper/slurper.go
··· 16 16 "github.com/bluesky-social/indigo/cmd/relayered/stream/schedulers/parallel" 17 17 18 18 "github.com/gorilla/websocket" 19 - pq "github.com/lib/pq" 20 19 "gorm.io/gorm" 21 20 ) 22 21 23 22 type IndexCallback func(context.Context, *PDS, *stream.XRPCStreamEvent) error 24 23 25 24 type Slurper struct { 26 - cb IndexCallback 27 - 28 - db *gorm.DB 25 + cb IndexCallback 26 + db *gorm.DB 27 + Config *SlurperConfig 29 28 30 29 lk sync.Mutex 31 30 active map[string]*activeSub 32 31 33 - LimitMux sync.RWMutex 34 - Limiters map[uint]*Limiters 35 - DefaultPerSecondLimit int64 36 - DefaultPerHourLimit int64 37 - DefaultPerDayLimit int64 32 + LimitMux sync.RWMutex 33 + Limiters map[uint]*Limiters 38 34 39 35 DefaultRepoLimit int64 40 36 ConcurrencyPerPDS int64 41 37 MaxQueuePerPDS int64 42 38 43 39 NewPDSPerDayLimiter *slidingwindow.Limiter 44 - 45 - newSubsDisabled bool 46 - trustedDomains []string 47 40 48 41 shutdownChan chan bool 49 42 shutdownResult chan []error 50 43 51 - ssl bool 52 - 53 44 log *slog.Logger 54 45 } 55 46 ··· 59 50 PerDay *slidingwindow.Limiter 60 51 } 61 52 62 - type SlurperOptions struct { 53 + type SlurperConfig struct { 63 54 SSL bool 64 55 DefaultPerSecondLimit int64 65 56 DefaultPerHourLimit int64 ··· 67 58 DefaultRepoLimit int64 68 59 ConcurrencyPerPDS int64 69 60 MaxQueuePerPDS int64 70 - 71 - Logger *slog.Logger 61 + NewSubsDisabled bool 62 + TrustedDomains []string 63 + NewPDSPerDayLimit int64 72 64 } 73 65 74 - func DefaultSlurperOptions() *SlurperOptions { 75 - return &SlurperOptions{ 66 + func DefaultSlurperConfig() *SlurperConfig { 67 + return &SlurperConfig{ 76 68 SSL: false, 77 69 DefaultPerSecondLimit: 50, 78 70 DefaultPerHourLimit: 2500, ··· 80 72 DefaultRepoLimit: 100, 81 73 ConcurrencyPerPDS: 100, 82 74 MaxQueuePerPDS: 1_000, 83 - 84 - Logger: slog.Default(), 85 75 } 86 76 } 87 77 ··· 98 88 sub.pds.Cursor = curs 99 89 } 100 90 101 - func NewSlurper(db *gorm.DB, cb IndexCallback, opts *SlurperOptions) (*Slurper, error) { 102 - if opts == nil { 103 - opts = DefaultSlurperOptions() 91 + func NewSlurper(db *gorm.DB, cb IndexCallback, config *SlurperConfig, logger *slog.Logger) (*Slurper, error) { 92 + if config == nil { 93 + config = DefaultSlurperConfig() 104 94 } 105 - err := db.AutoMigrate(&SlurpConfig{}) 106 - if err != nil { 107 - return nil, err 95 + if logger == nil { 96 + logger = slog.Default() 108 97 } 98 + 99 + // NOTE: unused second argument is not an 'error 100 + newPDSPerDayLimiter, _ := slidingwindow.NewLimiter(time.Hour*24, config.NewPDSPerDayLimit, windowFunc) 101 + 109 102 s := &Slurper{ 110 - cb: cb, 111 - db: db, 112 - active: make(map[string]*activeSub), 113 - Limiters: make(map[uint]*Limiters), 114 - DefaultPerSecondLimit: opts.DefaultPerSecondLimit, 115 - DefaultPerHourLimit: opts.DefaultPerHourLimit, 116 - DefaultPerDayLimit: opts.DefaultPerDayLimit, 117 - DefaultRepoLimit: opts.DefaultRepoLimit, 118 - ConcurrencyPerPDS: opts.ConcurrencyPerPDS, 119 - MaxQueuePerPDS: opts.MaxQueuePerPDS, 120 - ssl: opts.SSL, 121 - shutdownChan: make(chan bool), 122 - shutdownResult: make(chan []error), 123 - log: opts.Logger, 124 - } 125 - if err := s.loadConfig(); err != nil { 126 - return nil, err 103 + cb: cb, 104 + db: db, 105 + Config: config, 106 + active: make(map[string]*activeSub), 107 + Limiters: make(map[uint]*Limiters), 108 + shutdownChan: make(chan bool), 109 + shutdownResult: make(chan []error), 110 + NewPDSPerDayLimiter: newPDSPerDayLimiter, 111 + log: logger, 127 112 } 128 113 129 114 // Start a goroutine to flush cursors to the DB every 30s ··· 222 207 return errs 223 208 } 224 209 225 - func (s *Slurper) loadConfig() error { 226 - var sc SlurpConfig 227 - if err := s.db.Find(&sc).Error; err != nil { 228 - return err 229 - } 230 - 231 - if sc.ID == 0 { 232 - if err := s.db.Create(&SlurpConfig{}).Error; err != nil { 233 - return err 234 - } 235 - } 236 - 237 - s.newSubsDisabled = sc.NewSubsDisabled 238 - s.trustedDomains = sc.TrustedDomains 239 - 240 - s.NewPDSPerDayLimiter, _ = slidingwindow.NewLimiter(time.Hour*24, sc.NewPDSPerDayLimit, windowFunc) 241 - 242 - return nil 243 - } 244 - 245 - type SlurpConfig struct { 246 - gorm.Model 247 - 248 - NewSubsDisabled bool 249 - TrustedDomains pq.StringArray `gorm:"type:text[]"` 250 - NewPDSPerDayLimit int64 251 - } 252 - 253 - func (s *Slurper) SetNewSubsDisabled(dis bool) error { 254 - s.lk.Lock() 255 - defer s.lk.Unlock() 256 - 257 - if err := s.db.Model(SlurpConfig{}).Where("id = 1").Update("new_subs_disabled", dis).Error; err != nil { 258 - return err 259 - } 260 - 261 - s.newSubsDisabled = dis 262 - return nil 263 - } 264 - 265 - func (s *Slurper) GetNewSubsDisabledState() bool { 266 - s.lk.Lock() 267 - defer s.lk.Unlock() 268 - return s.newSubsDisabled 269 - } 270 - 271 - func (s *Slurper) SetNewPDSPerDayLimit(limit int64) error { 272 - s.lk.Lock() 273 - defer s.lk.Unlock() 274 - 275 - if err := s.db.Model(SlurpConfig{}).Where("id = 1").Update("new_pds_per_day_limit", limit).Error; err != nil { 276 - return err 277 - } 278 - 279 - s.NewPDSPerDayLimiter.SetLimit(limit) 280 - return nil 281 - } 282 - 283 - func (s *Slurper) GetNewPDSPerDayLimit() int64 { 284 - s.lk.Lock() 285 - defer s.lk.Unlock() 286 - return s.NewPDSPerDayLimiter.Limit() 287 - } 288 - 289 - func (s *Slurper) AddTrustedDomain(domain string) error { 290 - s.lk.Lock() 291 - defer s.lk.Unlock() 292 - 293 - if err := s.db.Model(SlurpConfig{}).Where("id = 1").Update("trusted_domains", gorm.Expr("array_append(trusted_domains, ?)", domain)).Error; err != nil { 294 - return err 295 - } 296 - 297 - s.trustedDomains = append(s.trustedDomains, domain) 298 - return nil 299 - } 300 - 301 - func (s *Slurper) RemoveTrustedDomain(domain string) error { 302 - s.lk.Lock() 303 - defer s.lk.Unlock() 304 - 305 - if err := s.db.Model(SlurpConfig{}).Where("id = 1").Update("trusted_domains", gorm.Expr("array_remove(trusted_domains, ?)", domain)).Error; err != nil { 306 - if errors.Is(err, gorm.ErrRecordNotFound) { 307 - return nil 308 - } 309 - return err 310 - } 311 - 312 - for i, d := range s.trustedDomains { 313 - if d == domain { 314 - s.trustedDomains = append(s.trustedDomains[:i], s.trustedDomains[i+1:]...) 315 - break 316 - } 317 - } 318 - 319 - return nil 320 - } 321 - 322 - func (s *Slurper) SetTrustedDomains(domains []string) error { 323 - s.lk.Lock() 324 - defer s.lk.Unlock() 325 - 326 - if err := s.db.Model(SlurpConfig{}).Where("id = 1").Update("trusted_domains", domains).Error; err != nil { 327 - return err 328 - } 329 - 330 - s.trustedDomains = domains 331 - return nil 332 - } 333 - 334 - func (s *Slurper) GetTrustedDomains() []string { 335 - s.lk.Lock() 336 - defer s.lk.Unlock() 337 - return s.trustedDomains 338 - } 339 - 340 210 var ErrNewSubsDisabled = fmt.Errorf("new subscriptions temporarily disabled") 341 211 342 212 // Checks whether a host is allowed to be subscribed to ··· 348 218 } 349 219 350 220 // Check if the host is a trusted domain 351 - for _, d := range s.trustedDomains { 221 + for _, d := range s.Config.TrustedDomains { 352 222 // If the domain starts with a *., it's a wildcard 353 223 if strings.HasPrefix(d, "*.") { 354 224 // Cut off the * so we have .domain.com ··· 362 232 } 363 233 } 364 234 365 - return !s.newSubsDisabled 235 + return !s.Config.NewSubsDisabled 366 236 } 367 237 368 238 func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adminOverride bool, rateOverrides *PDSRates) error { ··· 393 263 // New PDS! 394 264 npds := PDS{ 395 265 Host: host, 396 - SSL: s.ssl, 266 + SSL: s.Config.SSL, 397 267 Registered: reg, 398 - RateLimit: float64(s.DefaultPerSecondLimit), 399 - HourlyEventLimit: s.DefaultPerHourLimit, 400 - DailyEventLimit: s.DefaultPerDayLimit, 401 - RepoLimit: s.DefaultRepoLimit, 268 + RateLimit: float64(s.Config.DefaultPerSecondLimit), 269 + HourlyEventLimit: s.Config.DefaultPerHourLimit, 270 + DailyEventLimit: s.Config.DefaultPerDayLimit, 271 + RepoLimit: s.Config.DefaultRepoLimit, 402 272 } 403 273 if rateOverrides != nil { 404 274 npds.RateLimit = float64(rateOverrides.PerSecond) ··· 477 347 } 478 348 479 349 protocol := "ws" 480 - if s.ssl { 350 + if s.Config.SSL { 481 351 protocol = "wss" 482 352 } 483 353