this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

more relay hosting tweaks (#695)

This has some low-impact doc and scripts changes, and a couple
actually-impactful changes to Relay code:

- I think fixes a bug/type with per-PDS, per-second event rate limits
- wires through some concurrency and limit values as env vars; should
not impact the default behaviors
- updates PDS resync numbers (in database and results from API) much
more frequently, but doesn't log for all those updates. only impacts
actual PDS resync processes

authored by

bnewbold and committed by
GitHub
308be35c 52f36909

+101 -12
+34 -7
bgs/bgs.go
··· 106 106 EventsSent promclient.Counter 107 107 } 108 108 109 - func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, rf *indexer.RepoFetcher, hr api.HandleResolver, ssl bool, compactInterval time.Duration) (*BGS, error) { 109 + type BGSConfig struct { 110 + SSL bool 111 + CompactInterval time.Duration 112 + DefaultRepoLimit int64 113 + ConcurrencyPerPDS int64 114 + MaxQueuePerPDS int64 115 + } 116 + 117 + func DefaultBGSConfig() *BGSConfig { 118 + return &BGSConfig{ 119 + SSL: true, 120 + CompactInterval: 4 * time.Hour, 121 + DefaultRepoLimit: 100, 122 + ConcurrencyPerPDS: 100, 123 + MaxQueuePerPDS: 1_000, 124 + } 125 + } 126 + 127 + func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, rf *indexer.RepoFetcher, hr api.HandleResolver, config *BGSConfig) (*BGS, error) { 128 + 129 + if config == nil { 130 + config = DefaultBGSConfig() 131 + } 110 132 db.AutoMigrate(User{}) 111 133 db.AutoMigrate(AuthToken{}) 112 134 db.AutoMigrate(models.PDS{}) ··· 121 143 repoman: repoman, 122 144 events: evtman, 123 145 didr: didr, 124 - ssl: ssl, 146 + ssl: config.SSL, 125 147 126 148 consumersLk: sync.RWMutex{}, 127 149 consumers: make(map[uint64]*SocketConsumer), ··· 131 153 132 154 ix.CreateExternalUser = bgs.createExternalUser 133 155 slOpts := DefaultSlurperOptions() 134 - slOpts.SSL = ssl 156 + slOpts.SSL = config.SSL 157 + slOpts.DefaultRepoLimit = config.DefaultRepoLimit 158 + slOpts.ConcurrencyPerPDS = config.ConcurrencyPerPDS 159 + slOpts.MaxQueuePerPDS = config.MaxQueuePerPDS 135 160 s, err := NewSlurper(db, bgs.handleFedEvent, slOpts) 136 161 if err != nil { 137 162 return nil, err ··· 144 169 } 145 170 146 171 compactor := NewCompactor(nil) 147 - compactor.requeueInterval = compactInterval 172 + compactor.requeueInterval = config.CompactInterval 148 173 compactor.Start(bgs) 149 174 bgs.compactor = compactor 150 175 ··· 1158 1183 peering.Host = durl.Host 1159 1184 peering.SSL = (durl.Scheme == "https") 1160 1185 peering.CrawlRateLimit = float64(s.slurper.DefaultCrawlLimit) 1161 - peering.RepoLimit = s.slurper.DefaultPerSecondLimit 1186 + peering.RateLimit = float64(s.slurper.DefaultPerSecondLimit) 1162 1187 peering.HourlyEventLimit = s.slurper.DefaultPerHourLimit 1163 1188 peering.DailyEventLimit = s.slurper.DefaultPerDayLimit 1164 1189 peering.RepoLimit = s.slurper.DefaultRepoLimit ··· 1616 1641 log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", res.ai.Uid, "did", res.ai.Did) 1617 1642 } 1618 1643 } 1619 - if i%100_000 == 0 { 1620 - log.Warnw("checked revs during resync", "num_repos_checked", i, "num_repos_to_crawl", numReposToResync, "took", time.Now().Sub(resync.StatusChangedAt)) 1644 + if i%100 == 0 { 1645 + if i%10_000 == 0 { 1646 + log.Warnw("checked revs during resync", "num_repos_checked", i, "num_repos_to_crawl", numReposToResync, "took", time.Now().Sub(resync.StatusChangedAt)) 1647 + } 1621 1648 resync.NumReposChecked = i 1622 1649 resync.NumReposToResync = numReposToResync 1623 1650 bgs.UpdateResync(resync)
+8
bgs/fedmgr.go
··· 41 41 42 42 DefaultCrawlLimit rate.Limit 43 43 DefaultRepoLimit int64 44 + ConcurrencyPerPDS int64 45 + MaxQueuePerPDS int64 44 46 45 47 NewPDSPerDayLimiter *slidingwindow.Limiter 46 48 ··· 66 68 DefaultPerDayLimit int64 67 69 DefaultCrawlLimit rate.Limit 68 70 DefaultRepoLimit int64 71 + ConcurrencyPerPDS int64 72 + MaxQueuePerPDS int64 69 73 } 70 74 71 75 func DefaultSlurperOptions() *SlurperOptions { ··· 76 80 DefaultPerDayLimit: 20_000, 77 81 DefaultCrawlLimit: rate.Limit(5), 78 82 DefaultRepoLimit: 100, 83 + ConcurrencyPerPDS: 100, 84 + MaxQueuePerPDS: 1_000, 79 85 } 80 86 } 81 87 ··· 101 107 DefaultPerDayLimit: opts.DefaultPerDayLimit, 102 108 DefaultCrawlLimit: opts.DefaultCrawlLimit, 103 109 DefaultRepoLimit: opts.DefaultRepoLimit, 110 + ConcurrencyPerPDS: opts.ConcurrencyPerPDS, 111 + MaxQueuePerPDS: opts.MaxQueuePerPDS, 104 112 ssl: opts.SSL, 105 113 shutdownChan: make(chan bool), 106 114 shutdownResult: make(chan []error),
+7
cmd/bigsky/README.md
··· 111 111 - `RESOLVE_ADDRESS`: DNS server to use 112 112 - `FORCE_DNS_UDP`: recommend "true" 113 113 - `BGS_COMPACT_INTERVAL`: to control CAR compaction scheduling. for example, "8h" (every 8 hours). Set to "0" to disable automatic compaction. 114 + - `MAX_CARSTORE_CONNECTIONS` and `MAX_METADB_CONNECTIONS`: number of concurrent SQL database connections 115 + - `MAX_FETCH_CONCURRENCY`: how many outbound CAR backfill requests to make in parallel 114 116 115 117 There is a health check endpoint at `/xrpc/_health`. Prometheus metrics are exposed by default on port 2471, path `/metrics`. The service logs fairly verbosely to stderr; use `GOLOG_LOG_LEVEL` to control log volume. 116 118 ··· 143 145 144 146 # start sync/backfill of all accounts 145 147 cat hosts.txt | parallel -j1 ./sync_pds.sh {} 148 + 149 + Lastly, can monitor progress of any ongoing re-syncs: 150 + 151 + # check sync progress for all hosts 152 + cat hosts.txt | parallel -j1 ./sync_pds.sh {}
+22 -1
cmd/bigsky/main.go
··· 168 168 EnvVars: []string{"BSKY_SOCIAL_RATE_LIMIT_SKIP"}, 169 169 Usage: "ratelimit bypass secret token for *.bsky.social domains", 170 170 }, 171 + &cli.IntFlag{ 172 + Name: "default-repo-limit", 173 + Value: 100, 174 + EnvVars: []string{"RELAY_DEFAULT_REPO_LIMIT"}, 175 + }, 176 + &cli.IntFlag{ 177 + Name: "concurrency-per-pds", 178 + EnvVars: []string{"RELAY_CONCURRENCY_PER_PDS"}, 179 + Value: 100, 180 + }, 181 + &cli.IntFlag{ 182 + Name: "max-queue-per-pds", 183 + EnvVars: []string{"RELAY_MAX_QUEUE_PER_PDS"}, 184 + Value: 1_000, 185 + }, 171 186 } 172 187 173 188 app.Action = runBigsky ··· 377 392 } 378 393 379 394 log.Infow("constructing bgs") 380 - bgs, err := libbgs.NewBGS(db, ix, repoman, evtman, cachedidr, rf, hr, !cctx.Bool("crawl-insecure-ws"), cctx.Duration("compact-interval")) 395 + bgsConfig := libbgs.DefaultBGSConfig() 396 + bgsConfig.SSL = !cctx.Bool("crawl-insecure-ws") 397 + bgsConfig.CompactInterval = cctx.Duration("compact-interval") 398 + bgsConfig.ConcurrencyPerPDS = cctx.Int64("concurrency-per-pds") 399 + bgsConfig.MaxQueuePerPDS = cctx.Int64("max-queue-per-pds") 400 + bgsConfig.DefaultRepoLimit = cctx.Int64("default-repo-limit") 401 + bgs, err := libbgs.NewBGS(db, ix, repoman, evtman, cachedidr, rf, hr, bgsConfig) 381 402 if err != nil { 382 403 return err 383 404 }
+3 -3
cmd/bigsky/sync_pds.sh
··· 19 19 exit -1 20 20 fi 21 21 22 - echo "resync $1" 23 - http --quiet post https://${RELAY_HOST}/admin/pds/resync Authorization:"Bearer ${RELAY_ADMIN_KEY}" \ 24 - host=$1 22 + echo "POST resync $1" 23 + http --ignore-stdin post https://${RELAY_HOST}/admin/pds/resync Authorization:"Bearer ${RELAY_ADMIN_KEY}" \ 24 + host==$1
+24
cmd/bigsky/sync_status_pds.sh
··· 1 + #!/usr/bin/env bash 2 + 3 + set -e # fail on error 4 + set -u # fail if variable not set in substitution 5 + set -o pipefail # fail if part of a '|' command fails 6 + 7 + if test -z "${RELAY_ADMIN_KEY}"; then 8 + echo "RELAY_ADMIN_KEY secret is not defined" 9 + exit -1 10 + fi 11 + 12 + if test -z "${RELAY_HOST}"; then 13 + echo "RELAY_HOST config not defined" 14 + exit -1 15 + fi 16 + 17 + if test -z "$1"; then 18 + echo "expected PDS hostname as an argument" 19 + exit -1 20 + fi 21 + 22 + echo "GET resync $1" 23 + http --ignore-stdin --pretty all get https://${RELAY_HOST}/admin/pds/resync Authorization:"Bearer ${RELAY_ADMIN_KEY}" \ 24 + host==$1
+3 -1
testing/utils.go
··· 582 582 583 583 tr := &api.TestHandleResolver{} 584 584 585 - b, err := bgs.NewBGS(maindb, ix, repoman, evtman, didr, rf, tr, false, time.Hour*4) 585 + bgsConfig := bgs.DefaultBGSConfig() 586 + bgsConfig.SSL = false 587 + b, err := bgs.NewBGS(maindb, ix, repoman, evtman, didr, rf, tr, bgsConfig) 586 588 if err != nil { 587 589 return nil, err 588 590 }