this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

relay state synchronization: requestCrawl, admin endpoints (#1061)

- more complete rainbow forwarding to the relay backing it ("upstream
relay")
- do requestCrawl forwarding in relay. code in rainbow is still there,
but will remove config for rainbows deployed on new relay instances
("relay1")
- have relay also forward (some) admin endpoints to the same set of
"sibling" relays that requestCrawl gets forwarded
- HTTP header hygiene to prevent self-slurps: relay and rainbow set
`Server` and `Via` headers when appropriate, and relay checks this when
slurping. include the exact string `atproto-relay` in user-agents.
- integrate basic SSRF checks: when doing CheckHost (deciding whether to
add hostname to database), and at slurp time (when actually opening
websocket)

Expectation is that this results in the important info getting
synchronized between our multiple new prod relays, while preventing
"self-slurps" and forwarding loops:

- requestCrawl
- admin add-host
- domain bans
- account takedowns
- PDS takedowns
- PDS account limit changes

~This is marked Draft until I can self-review and test a bit more, but I
think all the pieces are here~. Don't have a test framework for this
part of relay/rainbow interaction :cry:.

authored by

bnewbold and committed by
GitHub
d61b8cb3 aee9ef95

+258 -52
+1
cmd/rainbow/README.md
··· 8 8 9 9 - retains "backfill window" on local disk (using [pebble](https://github.com/cockroachdb/pebble)) 10 10 - serves the `com.atproto.sync.subscribeRepos` endpoint (WebSocket) 11 + - proxies through public and administrative API requests to the backing host 11 12 - retains upstream firehose "sequence numbers" 12 13 - does not validate events (signatures, repo tree, hashes, etc), just passes through 13 14 - does not archive or mirror individual records or entire repositories (or implement related API endpoints)
+1 -1
cmd/rainbow/main.go
··· 186 186 CollectionDirHost: collectionDirHost, 187 187 CursorFile: cctx.String("cursor-file"), 188 188 PebbleOptions: &ppopts, 189 - UserAgent: fmt.Sprintf("rainbow/%s", versioninfo.Short()), 189 + UserAgent: fmt.Sprintf("rainbow/%s (atproto-relay)", versioninfo.Short()), 190 190 } 191 191 spl, err = splitter.NewSplitter(conf, nextCrawlers) 192 192 } else {
+3
cmd/relay/HACKING.md
··· 17 17 - messages for an account (DID) which come from a host connection which are not the current PDS host for that account are dropped. If there is a mismatch, the relay will re-resolve the identity (DID document) and double-check before dropping the message, in case there was an account migration not reflected yet in local caches. 18 18 - if a host sends no messages for a long period, the relay will drop the connection and set the host status to "idle"; this is common for low-traffic PDS instances (eg, handful of accounts). The expectation is that the host would then send a `requestCrawl` ping next time there is a new event. 19 19 - when the relay restarts, it connects to all "active" hosts 20 + - if configured with "sibling" relay instances, will forward `requestCrawl` and some administrative requests to each of those instances. The use-case is to keep a cluster of independent relays relatively synchronized in terms of hosts subscribed, takedowns, and quotas. Requests are only forwarded if processed successfully on the current instance. `User-Agent` is passed through from original request, but the `Via` header is set, and used to prevent forwarding loops. Auth headers are passed through; admin forwarding only works if the same secret works for all sibling relays. API requests forwarded to a remote rainbow instance (in front of a relay), should get proxied through to that relay successfully. 21 + - both the relay and rainbow set a `Server` header in HTTP responses (including WebSocket connections), and the relay checks for this header when connecting. If it finds the string `atproto-relay` in the header, it refuses the connection, to prevent relay request loops. This is just a conservative default behavior; relays consuming from other relays is allowed by protocol. 22 + - when connecting to remote hosts, including WebSocket subscriptions, the relay includes basic SSRF protections against connecting to private, reserved, or local IP addresses; or ports other than 80 or 443. This check is skipped if the remote host is specifically localhost (with an explicit port). If needed this constraint could be made configurable. 20 23 21 24 22 25 ## Internal Implementation Details
+86
cmd/relay/forward.go
··· 1 + package main 2 + 3 + import ( 4 + "bytes" 5 + "io" 6 + "net/http" 7 + "strings" 8 + 9 + "github.com/bluesky-social/indigo/cmd/relay/relay" 10 + 11 + "github.com/labstack/echo/v4" 12 + ) 13 + 14 + // Forwards HTTP request on to sibling relay instances, if they are configured. 15 + // 16 + // This method expects to be run "in the background" as a goroutine, so it doesn't take a `context.Context`, and does not return an `error`. It logs both success and failure. The `echo.Context` request has presumably been read, so any body is passed separately. The `echo.Context` response is presumably being returned or already finalized concurrently. 17 + func (s *Service) ForwardSiblingRequest(c echo.Context, body []byte) { 18 + 19 + if len(s.config.SiblingRelayHosts) == 0 { 20 + return 21 + } 22 + 23 + // if the request itself was already forwarded, or user-agent seems to be a relay, then don't forward on further (to prevent loops) 24 + req := c.Request() 25 + for _, via := range req.Header.Values("Via") { 26 + if strings.Contains(via, "atproto-relay") { 27 + s.logger.Info("not re-forwarding request to sibling relay", "header", "Via", "value", via) 28 + return 29 + } 30 + } 31 + for _, ua := range req.Header.Values("User-Agent") { 32 + if strings.Contains(ua, "atproto-relay") { 33 + s.logger.Info("not re-forwarding request to sibling relay", "header", "User-Agent", "value", ua) 34 + return 35 + } 36 + } 37 + 38 + for _, rawHost := range s.config.SiblingRelayHosts { 39 + hostname, noSSL, err := relay.ParseHostname(rawHost) 40 + if err != nil { 41 + s.logger.Error("invalid sibling hostname configured", "host", rawHost, "err", err) 42 + return 43 + } 44 + u := req.URL 45 + u.Host = hostname 46 + if noSSL { 47 + u.Scheme = "http" 48 + } else { 49 + u.Scheme = "https" 50 + } 51 + var b io.Reader 52 + if body != nil { 53 + b = bytes.NewBuffer(body) 54 + } 55 + upstreamReq, err := http.NewRequest(req.Method, u.String(), b) 56 + if err != nil { 57 + s.logger.Error("creating admin forward request failed", "method", req.Method, "url", u.String(), "err", err) 58 + continue 59 + } 60 + 61 + // copy some headers from inbound request 62 + for _, hdr := range []string{"Accept", "User-Agent", "Authorization", "Content-Type"} { 63 + val := req.Header.Get(hdr) 64 + if val != "" { 65 + upstreamReq.Header.Set(hdr, val) 66 + } 67 + } 68 + 69 + // add Via header (critical to prevent forwarding loops) 70 + upstreamReq.Header.Add("Via", req.Proto+" atproto-relay") 71 + 72 + upstreamResp, err := s.siblingClient.Do(upstreamReq) 73 + if err != nil { 74 + s.logger.Warn("forwarded admin HTTP request failed", "method", req.Method, "sibling", hostname, "url", u.String(), "err", err) 75 + continue 76 + } 77 + if !(upstreamResp.StatusCode >= 200 && upstreamResp.StatusCode < 300) { 78 + respBytes, _ := io.ReadAll(upstreamResp.Body) 79 + upstreamResp.Body.Close() 80 + s.logger.Warn("forwarded admin HTTP request failed", "method", req.Method, "sibling", hostname, "url", u.String(), "statusCode", upstreamResp.StatusCode, "body", string(respBytes)) 81 + continue 82 + } 83 + upstreamResp.Body.Close() 84 + s.logger.Info("successfully forwarded admin HTTP request", "method", req.Method, "url", u.String()) 85 + } 86 + }
+8
cmd/relay/handlers.go
··· 1 1 package main 2 2 3 3 import ( 4 + "encoding/json" 4 5 "errors" 5 6 "fmt" 6 7 "net/http" ··· 56 57 if err := s.relay.HostChecker.CheckHost(ctx, hostURL); err != nil { 57 58 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "HostNotFound", Message: fmt.Sprintf("host server unreachable: %s", err)}) 58 59 } 60 + 61 + // forward on to any sibling instances (note that sometimes is, sometimes isn't an admin request) 62 + b, err := json.Marshal(body) 63 + if err != nil { 64 + return err 65 + } 66 + go s.ForwardSiblingRequest(c, b) 59 67 60 68 return s.relay.SubscribeToHost(ctx, hostname, noSSL, false) 61 69 }
+48 -9
cmd/relay/handlers_admin.go
··· 1 1 package main 2 2 3 3 import ( 4 + "encoding/json" 4 5 "errors" 5 6 "fmt" 6 7 "net/http" ··· 17 18 dto "github.com/prometheus/client_model/go" 18 19 ) 19 20 20 - // this is the same as the regular com.atproto.sync.requestCrawl endpoint, except it sets a flag to bypass configuration checks 21 + // This endpoint is basically the same as the regular com.atproto.sync.requestCrawl endpoint, except it sets a flag to bypass configuration checks. 21 22 func (s *Service) handleAdminRequestCrawl(c echo.Context) error { 22 23 var body comatproto.SyncRequestCrawl_Input 23 24 if err := c.Bind(&body); err != nil { 24 25 return &echo.HTTPError{Code: http.StatusBadRequest, Message: fmt.Sprintf("invalid body: %s", err)} 25 26 } 26 27 27 - // func (s *Service) handleComAtprotoSyncRequestCrawl(ctx context.Context,body *comatproto.SyncRequestCrawl_Input) error 28 28 return s.handleComAtprotoSyncRequestCrawl(c, &body, true) 29 29 } 30 30 ··· 59 59 60 60 s.relay.HostPerDayLimiter.SetLimit(limit) 61 61 62 - // TODO: forward to SiblingRelayHosts 62 + // NOTE: *not* forwarding to sibling instances 63 + 63 64 return c.JSON(http.StatusOK, map[string]any{ 64 65 "success": "true", 65 66 }) ··· 97 98 } 98 99 } 99 100 100 - // TODO: forward to SiblingRelayHosts 101 + // forward on to any sibling instances 102 + b, err := json.Marshal(body) 103 + if err != nil { 104 + return err 105 + } 106 + go s.ForwardSiblingRequest(c, b) 107 + 101 108 return c.JSON(http.StatusOK, map[string]any{ 102 109 "success": "true", 103 110 }) ··· 135 142 } 136 143 } 137 144 138 - // TODO: forward to SiblingRelayHosts 145 + // forward on to any sibling instances 146 + b, err := json.Marshal(body) 147 + if err != nil { 148 + return err 149 + } 150 + go s.ForwardSiblingRequest(c, b) 151 + 139 152 return c.JSON(http.StatusOK, map[string]any{ 140 153 "success": "true", 141 154 }) ··· 306 319 return err 307 320 } 308 321 322 + // forward on to any sibling instances 323 + go s.ForwardSiblingRequest(c, nil) 324 + 309 325 return c.JSON(http.StatusOK, map[string]any{ 310 326 "success": "true", 311 327 }) ··· 337 353 // kill any active connection (there may not be one, so ignore error) 338 354 _ = s.relay.Slurper.KillUpstreamConnection(ctx, host.Hostname, false) 339 355 340 - // TODO: forward to SiblingRelayHosts 356 + // forward on to any sibling instances 357 + go s.ForwardSiblingRequest(c, nil) 358 + 341 359 return c.JSON(http.StatusOK, map[string]any{ 342 360 "success": "true", 343 361 }) ··· 366 384 } 367 385 } 368 386 369 - // TODO: forward to SiblingRelayHosts 387 + // forward on to any sibling instances 388 + go s.ForwardSiblingRequest(c, nil) 389 + 370 390 return c.JSON(http.StatusOK, map[string]any{ 371 391 "success": "true", 372 392 }) ··· 412 432 return err 413 433 } 414 434 415 - // TODO: forward to SiblingRelayHosts 435 + // forward on to any sibling instances 436 + b, err := json.Marshal(body) 437 + if err != nil { 438 + return err 439 + } 440 + go s.ForwardSiblingRequest(c, b) 441 + 416 442 return c.JSON(http.StatusOK, map[string]any{ 417 443 "success": "true", 418 444 }) ··· 431 457 return err 432 458 } 433 459 434 - // TODO: forward to SiblingRelayHosts 460 + // forward on to any sibling instances 461 + b, err := json.Marshal(body) 462 + if err != nil { 463 + return err 464 + } 465 + go s.ForwardSiblingRequest(c, b) 466 + 435 467 return c.JSON(http.StatusOK, map[string]any{ 436 468 "success": "true", 437 469 }) ··· 469 501 if err := s.relay.UpdateHostAccountLimit(ctx, host.ID, *body.RepoLimit); err != nil { 470 502 return echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to update limits: %s", err)) 471 503 } 504 + 505 + // forward on to any sibling instances 506 + b, err := json.Marshal(body) 507 + if err != nil { 508 + return err 509 + } 510 + go s.ForwardSiblingRequest(c, b) 472 511 473 512 return c.JSON(http.StatusOK, map[string]any{ 474 513 "success": "true",
+1 -1
cmd/relay/main.go
··· 246 246 } 247 247 248 248 relayConfig := relay.DefaultRelayConfig() 249 - relayConfig.UserAgent = fmt.Sprintf("indigo-relay/%s", versioninfo.Short()) 249 + relayConfig.UserAgent = fmt.Sprintf("indigo-relay/%s (atproto-relay)", versioninfo.Short()) 250 250 relayConfig.ConcurrencyPerHost = cctx.Int("host-concurrency") 251 251 relayConfig.DefaultRepoLimit = cctx.Int64("default-account-limit") 252 252 relayConfig.HostPerDayLimit = cctx.Int64("new-hosts-per-day-limit")
+8 -2
cmd/relay/relay/host_checker.go
··· 4 4 "context" 5 5 "fmt" 6 6 "net/http" 7 + "time" 7 8 8 9 comatproto "github.com/bluesky-social/indigo/api/atproto" 9 10 "github.com/bluesky-social/indigo/atproto/identity" 11 + "github.com/bluesky-social/indigo/util/ssrf" 10 12 "github.com/bluesky-social/indigo/xrpc" 11 13 ) 12 14 ··· 28 30 29 31 func NewHostClient(userAgent string) *HostClient { 30 32 if userAgent == "" { 31 - userAgent = "indigo-relay" 33 + userAgent = "indigo-relay (atproto-relay)" 34 + } 35 + c := http.Client{ 36 + Timeout: 5 * time.Second, 37 + Transport: ssrf.PublicOnlyTransport(), 32 38 } 33 39 return &HostClient{ 34 - Client: http.DefaultClient, 40 + Client: &c, 35 41 UserAgent: userAgent, 36 42 } 37 43 }
+1 -1
cmd/relay/relay/relay.go
··· 53 53 func DefaultRelayConfig() *RelayConfig { 54 54 // NOTE: many of these defaults are clobbered by CLI arguments 55 55 return &RelayConfig{ 56 - UserAgent: "indigo-relay", 56 + UserAgent: "indigo-relay (atproto-relay)", 57 57 DefaultRepoLimit: 100, 58 58 TrustedRepoLimit: 10_000_000, 59 59 ConcurrencyPerHost: 40,
+21 -4
cmd/relay/relay/slurper.go
··· 6 6 "log/slog" 7 7 "math/rand" 8 8 "net/http" 9 + "strings" 9 10 "sync" 10 11 "sync/atomic" 11 12 "time" ··· 14 15 "github.com/bluesky-social/indigo/cmd/relay/relay/models" 15 16 "github.com/bluesky-social/indigo/cmd/relay/stream" 16 17 "github.com/bluesky-social/indigo/cmd/relay/stream/schedulers/parallel" 18 + "github.com/bluesky-social/indigo/util/ssrf" 17 19 18 20 "github.com/RussellLuo/slidingwindow" 19 21 "github.com/gorilla/websocket" ··· 60 62 func DefaultSlurperConfig() *SlurperConfig { 61 63 // NOTE: many of these defaults are overruled by DefaultRelayConfig, or even process CLI arg defaults 62 64 return &SlurperConfig{ 63 - UserAgent: "indigo-relay", 65 + UserAgent: "indigo-relay (atproto-relay)", 64 66 ConcurrencyPerHost: 40, 65 67 // NOTE: queue depth doesn't do anything with current parallel scheduler implementation 66 68 QueueDepthPerHost: 1000, ··· 286 288 HandshakeTimeout: time.Second * 5, 287 289 } 288 290 291 + // if this isn't a localhost / private connection, then we should enable SSRF protections 292 + if !host.NoSSL { 293 + netDialer := ssrf.PublicOnlyDialer() 294 + d.NetDialContext = netDialer.DialContext 295 + } 296 + 289 297 cursor := host.LastSeq 290 298 291 299 connectedInbound.Inc() ··· 306 314 } 307 315 hdr := make(http.Header) 308 316 hdr.Add("User-Agent", s.Config.UserAgent) 309 - conn, res, err := d.DialContext(ctx, u, hdr) 317 + conn, resp, err := d.DialContext(ctx, u, hdr) 310 318 if err != nil { 311 319 s.logger.Warn("dialing failed", "host", host.Hostname, "err", err, "backoff", backoff) 312 320 time.Sleep(sleepForBackoff(backoff)) ··· 323 331 continue 324 332 } 325 333 326 - s.logger.Debug("event subscription response", "code", res.StatusCode, "url", u) 334 + // check if we connected to a relay (eg, this indigo relay, or rainbow) and drop if so 335 + serverHdr := resp.Header.Get("Server") 336 + if strings.Contains(serverHdr, "atproto-relay") { 337 + s.logger.Warn("subscribed host is atproto relay of some kind, banning", "server", serverHdr, "url", u, "hostname", sub.Hostname) 338 + if err := s.Config.PersistHostStatusCallback(ctx, sub.HostID, models.HostStatusBanned); err != nil { 339 + s.logger.Error("failed mark host as banned", "hostname", sub.Hostname, "err", err) 340 + } 341 + } 342 + 343 + s.logger.Debug("event subscription response", "code", resp.StatusCode, "url", u) 327 344 328 345 curCursor := cursor 329 346 if err := s.handleConnection(ctx, conn, &cursor, sub); err != nil { ··· 369 386 logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "sync") 370 387 logger.Debug("commit event") 371 388 if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoSync: evt}, sub.Hostname, sub.HostID); err != nil { 372 - s.logger.Error("failed handling event", "err", err) 389 + logger.Error("failed handling event", "err", err) 373 390 } 374 391 sub.UpdateSeq() 375 392
+13 -6
cmd/relay/service.go
··· 23 23 relay *relay.Relay 24 24 config ServiceConfig 25 25 26 - crawlForwardClient http.Client 26 + siblingClient http.Client 27 27 } 28 28 29 29 type ServiceConfig struct { ··· 56 56 } 57 57 58 58 svc := &Service{ 59 - logger: slog.Default().With("system", "relay"), 60 - relay: r, 61 - config: *config, 62 - crawlForwardClient: http.Client{}, 59 + logger: slog.Default().With("system", "relay"), 60 + relay: r, 61 + config: *config, 62 + siblingClient: http.Client{ 63 + Timeout: 10 * time.Second, 64 + }, 63 65 } 64 - svc.crawlForwardClient.Timeout = time.Second * 5 65 66 66 67 return svc, nil 67 68 } ··· 91 92 AllowOrigins: []string{"*"}, 92 93 AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept, echo.HeaderAuthorization}, 93 94 })) 95 + e.Use(func(next echo.HandlerFunc) echo.HandlerFunc { 96 + return func(c echo.Context) error { 97 + c.Response().Header().Set(echo.HeaderServer, svc.relay.Config.UserAgent) 98 + return next(c) 99 + } 100 + }) 94 101 e.Use(middleware.LoggerWithConfig(middleware.DefaultLoggerConfig)) 95 102 96 103 // React uses a virtual router, so we need to serve the index.html for all
+2 -2
cmd/relay/stream/persist/diskpersist/diskpersist.go
··· 682 682 needslogs := true 683 683 if since != 0 { 684 684 // find the log file that starts before our since 685 - result := dp.meta.Debug().Order("seq_start desc").Where("seq_start < ?", since).Limit(1).Find(&logs) 685 + result := dp.meta.Order("seq_start desc").Where("seq_start < ?", since).Limit(1).Find(&logs) 686 686 if result.Error != nil { 687 687 return result.Error 688 688 } ··· 696 696 // don't decrease '10' below 2 because we should always do two passes through this if the above before-chunk query was used. 697 697 for i := 0; i < 10; i++ { 698 698 if needslogs { 699 - if err := dp.meta.Debug().Order("seq_start asc").Find(&logs, "seq_start >= ?", since).Error; err != nil { 699 + if err := dp.meta.Order("seq_start asc").Find(&logs, "seq_start >= ?", since).Error; err != nil { 700 700 return err 701 701 } 702 702 }
+3 -3
cmd/relay/stream/schedulers/parallel/parallel.go
··· 86 86 <-p.out 87 87 } 88 88 89 - p.log.Info("parallel scheduler shutdown complete") 89 + p.log.Info("parallel scheduler shutdown complete", "ident", p.ident) 90 90 } 91 91 92 92 type consumerTask struct { ··· 138 138 p.itemsActive.Inc() 139 139 seq := work.val.Sequence() 140 140 if err := p.do(context.TODO(), work.val); err != nil { 141 - p.log.Error("event handler failed", "err", err) 141 + p.log.Error("event handler failed", "ident", p.ident, "err", err) 142 142 } 143 143 p.itemsProcessed.Inc() 144 144 145 145 p.lk.Lock() 146 146 rem, ok := p.active[work.repo] 147 147 if !ok { 148 - p.log.Error("should always have an 'active' entry if a worker is processing a job") 148 + p.log.Error("should always have an 'active' entry if a worker is processing a job", "ident", p.ident) 149 149 } 150 150 151 151 if len(rem) == 0 {
+12 -11
splitter/handlers.go
··· 6 6 "io" 7 7 "net/http" 8 8 "net/url" 9 - "strings" 10 9 11 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 11 "github.com/bluesky-social/indigo/xrpc" ··· 66 65 } 67 66 68 67 // if that was successful, then forward on to the other upstreams (in goroutines) 69 - for _, c := range s.nextCrawlers { 68 + for _, nc := range s.nextCrawlers { 70 69 // intentional local copy of loop variable 71 - hostname := c.String() 70 + crawler := nc.String() 72 71 go func() { 73 72 // new context to outlive original HTTP request 74 73 ctx := context.Background() 75 74 xrpcc := xrpc.Client{ 76 75 Client: s.peerClient, 77 - Host: hostname, 76 + Host: crawler, 78 77 } 79 78 if err := comatproto.SyncRequestCrawl(ctx, &xrpcc, &body); err != nil { 80 - s.logger.Warn("failed to forward requestCrawl", "upstream", hostname, "targetHost", body.Hostname, "err", err) 79 + s.logger.Warn("failed to forward requestCrawl", "crawler", crawler, "targetHost", body.Hostname, "err", err) 80 + } else { 81 + s.logger.Info("successfully forwarded requestCrawl", "crawler", crawler, "targetHost", body.Hostname) 81 82 } 82 - s.logger.Info("successfully forwarded requestCrawl", "upstream", hostname, "targetHost", body.Hostname) 83 83 }() 84 84 } 85 85 ··· 118 118 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: "failed to proxy to upstream relay"}) 119 119 } 120 120 121 - for k, vals := range req.Header { 122 - if strings.ToLower(k) == "accept" { 123 - upstreamReq.Header.Add(k, vals[0]) 121 + // copy subset of request headers 122 + for _, hdr := range []string{"Accept", "User-Agent", "Authorization", "Via", "Content-Type", "Content-Length"} { 123 + val := req.Header.Get(hdr) 124 + if val != "" { 125 + upstreamReq.Header.Set(hdr, val) 124 126 } 125 127 } 126 - upstreamReq.Header.Add("User-Agent", s.conf.UserAgent) 127 128 128 129 upstreamResp, err := s.upstreamClient.Do(upstreamReq) 129 130 if err != nil { ··· 131 132 } 132 133 defer upstreamResp.Body.Close() 133 134 134 - // copy a subset of headers 135 + // copy a subset of response headers 135 136 for _, hdr := range []string{"Content-Type", "Content-Length", "Location"} { 136 137 val := upstreamResp.Header.Get(hdr) 137 138 if val != "" {
+50 -12
splitter/splitter.go
··· 44 44 45 45 upstreamClient *http.Client 46 46 peerClient *http.Client 47 - nextCrawlers []*url.URL 47 + nextCrawlers []url.URL 48 48 } 49 49 50 50 type SplitterConfig struct { ··· 105 105 logger = slog.Default().With("system", "splitter") 106 106 } 107 107 108 - var nextCrawlerURLs []*url.URL 109 - if len(nextCrawlers) > 0 { 110 - nextCrawlerURLs = make([]*url.URL, len(nextCrawlers)) 111 - for i, tu := range nextCrawlers { 112 - var err error 113 - nextCrawlerURLs[i], err = url.Parse(tu) 114 - if err != nil { 115 - return nil, fmt.Errorf("failed to parse next-crawler url: %w", err) 116 - } 117 - logger.Info("configuring relay for requestCrawl", "host", nextCrawlerURLs[i]) 108 + var nextCrawlerURLs []url.URL 109 + for _, raw := range nextCrawlers { 110 + if raw == "" { 111 + continue 112 + } 113 + u, err := url.Parse(raw) 114 + if err != nil { 115 + return nil, fmt.Errorf("failed to parse next-crawler url: %w", err) 116 + } 117 + if u.Host == "" { 118 + return nil, fmt.Errorf("empty URL host for next crawler: %s", raw) 118 119 } 120 + nextCrawlerURLs = append(nextCrawlerURLs, *u) 121 + } 122 + if len(nextCrawlerURLs) > 0 { 123 + logger.Info("configured crawler forwarding", "crawlers", nextCrawlerURLs) 119 124 } 120 125 121 126 _, err := url.Parse(conf.UpstreamHostHTTP()) ··· 195 200 AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept, echo.HeaderAuthorization}, 196 201 })) 197 202 203 + e.Use(func(next echo.HandlerFunc) echo.HandlerFunc { 204 + return func(c echo.Context) error { 205 + c.Response().Header().Set(echo.HeaderServer, s.conf.UserAgent) 206 + return next(c) 207 + } 208 + }) 209 + 198 210 /* 199 211 if !s.ssl { 200 212 e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ ··· 208 220 e.Use(svcutil.MetricsMiddleware) 209 221 e.HTTPErrorHandler = s.errorHandler 210 222 211 - e.POST("/xrpc/com.atproto.sync.requestCrawl", s.HandleComAtprotoSyncRequestCrawl) 223 + if len(s.nextCrawlers) > 0 { 224 + // forwards on to multiple hosts, but strips several headers (like User-Agent) 225 + s.logger.Info("using legacy requestCrawl forwarding") 226 + e.POST("/xrpc/com.atproto.sync.requestCrawl", s.HandleComAtprotoSyncRequestCrawl) 227 + } else { 228 + // simply proxies to upstream 229 + e.POST("/xrpc/com.atproto.sync.requestCrawl", s.ProxyRequestUpstream) 230 + } 212 231 e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.HandleSubscribeRepos) 213 232 214 233 // proxy endpoints to upstream (relay) ··· 218 237 e.GET("/xrpc/com.atproto.sync.listHosts", s.ProxyRequestUpstream) 219 238 e.GET("/xrpc/com.atproto.sync.getHostStatus", s.ProxyRequestUpstream) 220 239 e.GET("/xrpc/com.atproto.sync.getRepo", s.ProxyRequestUpstream) 240 + 241 + // proxy relay admin endpoints for inter-relay synchronization 242 + e.GET("/admin/subs/getUpstreamConns", s.ProxyRequestUpstream) 243 + e.POST("/admin/subs/killUpstream", s.ProxyRequestUpstream) 244 + e.GET("/admin/subs/getEnabled", s.ProxyRequestUpstream) 245 + e.POST("/admin/subs/setEnabled", s.ProxyRequestUpstream) 246 + e.GET("/admin/subs/perDayLimit", s.ProxyRequestUpstream) 247 + e.POST("/admin/subs/setPerDayLimit", s.ProxyRequestUpstream) 248 + e.GET("/admin/subs/listDomainBans", s.ProxyRequestUpstream) 249 + e.POST("/admin/subs/banDomain", s.ProxyRequestUpstream) 250 + e.POST("/admin/subs/unbanDomain", s.ProxyRequestUpstream) 251 + e.POST("/admin/repo/takeDown", s.ProxyRequestUpstream) 252 + e.POST("/admin/repo/reverseTakedown", s.ProxyRequestUpstream) 253 + e.GET("/admin/pds/list", s.ProxyRequestUpstream) 254 + e.POST("/admin/pds/requestCrawl", s.ProxyRequestUpstream) 255 + e.POST("/admin/pds/changeLimits", s.ProxyRequestUpstream) 256 + e.POST("/admin/pds/block", s.ProxyRequestUpstream) 257 + e.POST("/admin/pds/unblock", s.ProxyRequestUpstream) 258 + e.GET("/admin/consumers/list", s.ProxyRequestUpstream) 221 259 222 260 // proxy endpoint to collectiondir 223 261 e.GET("/xrpc/com.atproto.sync.listReposByCollection", s.ProxyRequestCollectionDir)