···1818func (s *Service) handleComAtprotoSyncRequestCrawl(c echo.Context, body *comatproto.SyncRequestCrawl_Input, admin bool) error {
1919 ctx := c.Request().Context()
20202121+ if s.config.DisableRequestCrawl && !admin {
2222+ return c.JSON(http.StatusForbidden, xrpc.XRPCError{ErrStr: "Forbidden", Message: "public requestCrawl not allowed on this relay"})
2323+ }
2424+2125 hostname, noSSL, err := relay.ParseHostname(body.Hostname)
2226 if err != nil {
2327 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("hostname field empty or invalid: %s", body.Hostname)})
+38-42
cmd/rerelay/main.go
···66 "fmt"
77 "io"
88 "log/slog"
99- "net/url"
109 "os"
1110 "os/signal"
1211 "strings"
···4342 Version: versioninfo.Short(),
4443 }
4544 app.Flags = []cli.Flag{
4646- // XXX: actually disabled if empty?
4745 &cli.StringFlag{
4846 Name: "admin-password",
4947 Usage: "secret password/token for accessing admin endpoints (random is used if not set)",
5048 EnvVars: []string{"RELAY_ADMIN_PASSWORD", "RELAY_ADMIN_KEY"},
5149 },
5252- // XXX: not used?
5350 &cli.StringFlag{
5451 Name: "plc-host",
5552 Usage: "method, hostname, and port of PLC registry",
5653 Value: "https://plc.directory",
5757- EnvVars: []string{"ATP_PLC_HOST"},
5454+ EnvVars: []string{"RELAY_PLC_HOST", "ATP_PLC_HOST"},
5855 },
5956 &cli.StringFlag{
6057 Name: "log-level",
···10299 Name: "host-concurrency",
103100 Usage: "number of concurrent worker routines per upstream host",
104101 EnvVars: []string{"RELAY_HOST_CONCURRENCY", "RELAY_CONCURRENCY_PER_PDS"},
105105- Value: 100,
102102+ Value: 40,
103103+ },
104104+ &cli.IntFlag{
105105+ Name: "host-queue-depth",
106106+ Usage: "size of queue (channel) per-host for unprocessed events",
107107+ EnvVars: []string{"RELAY_HOST_QUEUE_SIZE"},
108108+ Value: 1000,
106109 },
107110 &cli.IntFlag{
108111 Name: "default-account-limit",
···111114 EnvVars: []string{"RELAY_DEFAULT_ACCOUUNT_LIMIT", "RELAY_DEFAULT_REPO_LIMIT"},
112115 },
113116 &cli.IntFlag{
114114- Name: "did-cache-size",
117117+ Name: "ident-cache-size",
115118 Value: 5_000_000,
116116- Usage: "size of in-process DID (identity) cache",
117117- EnvVars: []string{"RELAY_DID_CACHE_SIZE"},
119119+ Usage: "size of in-process identity cache (eg, DID docs)",
120120+ EnvVars: []string{"RELAY_IDENT_CACHE_SIZE", "RELAY_DID_CACHE_SIZE"},
121121+ },
122122+ &cli.BoolFlag{
123123+ Name: "disable-request-crawl",
124124+ Usage: "don't process public (un-authenticated) com.atproto.sync.requestCrawl",
125125+ EnvVars: []string{"RELAY_DISABLE_REQUEST_CRAWL"},
126126+ },
127127+ // XXX: should this be handled by rainbow instead of relays?
128128+ &cli.StringSliceFlag{
129129+ Name: "forward-crawl-requests",
130130+ Usage: "servers (eg https://example.com) to forward requestCrawl on to; multiple allowed",
131131+ EnvVars: []string{"RELAY_FORWARD_CRAWL_REQUESTS", "RELAY_NEXT_CRAWLER"},
132132+ },
133133+ &cli.StringSliceFlag{
134134+ Name: "trusted-domains",
135135+ Usage: "domain name suffixes which mark trusted hosts",
136136+ EnvVars: []string{"RELAY_TRUSTED_DOMAINS"},
118137 },
119138 &cli.StringFlag{
120139 Name: "env",
···142161 Value: "http://localhost:4328",
143162 EnvVars: []string{"OTEL_EXPORTER_OTLP_ENDPOINT"},
144163 },
145145- // XXX: refactor this flag
146146- &cli.BoolFlag{
147147- Name: "crawl-insecure-ws",
148148- Usage: "when connecting to PDS instances, use ws:// instead of wss://",
149149- },
150150- &cli.StringSliceFlag{
151151- Name: "forward-crawl-requests",
152152- Usage: "comma-separated list of servers (eg https://example.com) to forward requestCrawl on to",
153153- EnvVars: []string{"RELAY_FORWARD_CRAWL_REQUESTS", "RELAY_NEXT_CRAWLER"},
154154- },
155155- &cli.StringFlag{
156156- Name: "bsky-social-rate-limit-skip",
157157- EnvVars: []string{"BSKY_SOCIAL_RATE_LIMIT_SKIP"},
158158- Usage: "ratelimit bypass secret token for *.bsky.social domains",
159159- },
160164 },
161165 },
162166 }
···205209 SkipHandleVerification: true,
206210 SkipDNSDomainSuffixes: []string{".bsky.social"},
207211 TryAuthoritativeDNS: true,
212212+ PLCURL: cctx.String("plc-host"),
208213 }
209209- dir := identity.NewCacheDirectory(&baseDir, cctx.Int("did-cache-size"), time.Hour*24, time.Minute*2, time.Minute*5)
214214+ dir := identity.NewCacheDirectory(&baseDir, cctx.Int("ident-cache-size"), time.Hour*24, time.Minute*2, time.Minute*5)
210215211216 persistDir := cctx.String("persist-dir")
212217 os.MkdirAll(persistDir, os.ModePerm)
···218223 return fmt.Errorf("setting up disk persister: %w", err)
219224 }
220225221221- svcConfig := DefaultServiceConfig()
222226 relayConfig := relay.DefaultRelayConfig()
223223- relayConfig.SSL = !cctx.Bool("crawl-insecure-ws")
224224- relayConfig.ConcurrencyPerHost = cctx.Int64("host-concurrency")
227227+ relayConfig.ConcurrencyPerHost = cctx.Int("host-concurrency")
228228+ relayConfig.QueueDepthPerHost = cctx.Int("host-queue-depth")
225229 relayConfig.DefaultRepoLimit = cctx.Int64("default-account-limit")
226226- ratelimitBypass := cctx.String("bsky-social-rate-limit-skip")
227227- // TODO: actually use ratelimitBypass for host checks?
228228- _ = ratelimitBypass
229229- nextCrawlers := cctx.StringSlice("forward-crawl-requests")
230230- if len(nextCrawlers) > 0 {
231231- nextCrawlerUrls := make([]*url.URL, len(nextCrawlers))
232232- for i, tu := range nextCrawlers {
233233- var err error
234234- nextCrawlerUrls[i], err = url.Parse(tu)
235235- if err != nil {
236236- return fmt.Errorf("invalid crawl request forwarding URL: %w", err)
237237- }
238238- }
239239- svcConfig.NextCrawlers = nextCrawlerUrls
240240- logger.Info("crawl request forwarding enabled", "servers", svcConfig.NextCrawlers)
230230+ relayConfig.TrustedDomains = cctx.StringSlice("trusted-domains")
231231+232232+ svcConfig := DefaultServiceConfig()
233233+ svcConfig.DisableRequestCrawl = !cctx.Bool("disable-request-crawl")
234234+ svcConfig.ForwardCrawlRequestHosts = cctx.StringSlice("forward-crawl-requests")
235235+ if len(svcConfig.ForwardCrawlRequestHosts) > 0 {
236236+ logger.Info("crawl request forwarding enabled", "servers", svcConfig.ForwardCrawlRequestHosts)
241237 }
242238 if cctx.IsSet("admin-password") {
243239 svcConfig.AdminPassword = cctx.String("admin-password")
+6-2
cmd/rerelay/relay/relay.go
···4141type RelayConfig struct {
4242 SSL bool
4343 DefaultRepoLimit int64
4444- ConcurrencyPerHost int64
4444+ ConcurrencyPerHost int
4545+ QueueDepthPerHost int
4546 SkipAccountHostCheck bool // XXX: only used for testing
4647 LenientSyncValidation bool // XXX: wire through config
4748···5152}
52535354func DefaultRelayConfig() *RelayConfig {
5555+ // NOTE: many of these defaults are CLI arg defaults
5456 return &RelayConfig{
5557 SSL: true,
5658 DefaultRepoLimit: 100,
5757- ConcurrencyPerHost: 100,
5959+ ConcurrencyPerHost: 40,
6060+ QueueDepthPerHost: 1000,
5861 }
5962}
6063···9194 slurpConfig.SSL = config.SSL
9295 slurpConfig.DefaultRepoLimit = config.DefaultRepoLimit
9396 slurpConfig.ConcurrencyPerHost = config.ConcurrencyPerHost
9797+ slurpConfig.QueueDepthPerHost = config.QueueDepthPerHost
9498 // register callbacks to persist cursors and host state in database
9599 slurpConfig.PersistCursorCallback = r.PersistHostCursors
96100 slurpConfig.PersistHostStatusCallback = r.UpdateHostStatus
+11-11
cmd/rerelay/relay/slurper.go
···77 "log/slog"
88 "math/rand"
99 "sync"
1010+ "sync/atomic"
1011 "time"
11121213 comatproto "github.com/bluesky-social/indigo/api/atproto"
···5859 DefaultPerHourLimit int64
5960 DefaultPerDayLimit int64
6061 DefaultRepoLimit int64
6161- ConcurrencyPerHost int64
6262+ ConcurrencyPerHost int
6363+ QueueDepthPerHost int
6264 NewHostPerDayLimit int64
6365 PersistCursorPeriod time.Duration
6466 PersistCursorCallback PersistCursorFunc
···6668}
67696870func DefaultSlurperConfig() *SlurperConfig {
7171+ // NOTE: many of these defaults are overruled by DefaultRelayConfig, or even process CLI arg defaults
6972 return &SlurperConfig{
7073 SSL: false,
7174 DefaultPerSecondLimit: 50,
···7376 DefaultPerDayLimit: 20_000,
7477 DefaultRepoLimit: 100,
7578 ConcurrencyPerHost: 40,
7979+ QueueDepthPerHost: 1000,
7680 PersistCursorPeriod: time.Second * 10,
7781 }
7882}
···8185type Subscription struct {
8286 Hostname string
8387 HostID uint64
8484- LastSeq int64 // XXX: switch to an atomic instead of lock?
8888+ LastSeq atomic.Int64
8589 Limiters *Limiters // XXX: is this used? or only the separate limiters on Slurper?
86908791 lk sync.RWMutex
···9094}
91959296func (sub *Subscription) UpdateSeq(seq int64) {
9393- sub.lk.Lock()
9494- defer sub.lk.Unlock()
9595- sub.LastSeq = seq
9797+ sub.LastSeq.Store(seq)
9698}
979998100func (sub *Subscription) HostCursor() HostCursor {
···100102 defer sub.lk.Unlock()
101103 return HostCursor{
102104 HostID: sub.HostID,
103103- LastSeq: sub.LastSeq,
105105+ LastSeq: sub.LastSeq.Load(),
104106 }
105107}
106108···448450 instrumentedRSC := stream.NewInstrumentedRepoStreamCallbacks(limiters, rsc.EventHandler)
449451450452 pool := parallel.NewScheduler(
451451- 100, // XXX: concurrency
452452- 1_000, // XXX: max queue per host
453453+ s.Config.ConcurrencyPerHost,
454454+ s.Config.QueueDepthPerHost,
453455 conn.RemoteAddr().String(),
454456 instrumentedRSC.EventHandler,
455457 )
···474476 cursors := make([]HostCursor, len(s.subs))
475477 i := 0
476478 for _, sub := range s.subs {
477477- sub.lk.RLock()
478479 cursors[i] = HostCursor{
479480 HostID: sub.HostID,
480480- LastSeq: sub.LastSeq,
481481+ LastSeq: sub.LastSeq.Load(),
481482 }
482482- sub.lk.RUnlock()
483483 i++
484484 }
485485 s.subsLk.Unlock()
+5-3
cmd/rerelay/service.go
···66 "log/slog"
77 "net"
88 "net/http"
99- "net/url"
109 "strings"
1110 "time"
1211···2928}
30293130type ServiceConfig struct {
3232- // NextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl
3333- NextCrawlers []*url.URL
3131+ // list of hosts which get forwarded com.atproto.sync.requestCrawl (HTTP POST)
3232+ ForwardCrawlRequestHosts []string
34333534 // verified against Basic admin auth
3635 AdminPassword string
37363837 // how long to wait for the requested server socket to become available for use
3938 ListenerBootTimeout time.Duration
3939+4040+ // if true, don't process public (unauthenticated) requestCrawl
4141+ DisableRequestCrawl bool
4042}
41434244func DefaultServiceConfig() *ServiceConfig {