this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

implement sibling relay admin request forwarding

+93 -8
+93 -8
cmd/relay/handlers_admin.go
··· 24 24 return &echo.HTTPError{Code: http.StatusBadRequest, Message: fmt.Sprintf("invalid body: %s", err)} 25 25 } 26 26 27 - // func (s *Service) handleComAtprotoSyncRequestCrawl(ctx context.Context,body *comatproto.SyncRequestCrawl_Input) error 28 27 return s.handleComAtprotoSyncRequestCrawl(c, &body, true) 29 28 } 30 29 ··· 59 58 60 59 s.relay.HostPerDayLimiter.SetLimit(limit) 61 60 62 - // TODO: forward to SiblingRelayHosts 61 + // NOTE: *not* forwarding to sibling instances 62 + 63 63 return c.JSON(http.StatusOK, map[string]any{ 64 64 "success": "true", 65 65 }) ··· 97 97 } 98 98 } 99 99 100 - // TODO: forward to SiblingRelayHosts 100 + // forward on to any sibling instances 101 + go s.ForwardAdminRequest(c) 102 + 101 103 return c.JSON(http.StatusOK, map[string]any{ 102 104 "success": "true", 103 105 }) ··· 135 137 } 136 138 } 137 139 138 - // TODO: forward to SiblingRelayHosts 140 + // forward on to any sibling instances 141 + go s.ForwardAdminRequest(c) 142 + 139 143 return c.JSON(http.StatusOK, map[string]any{ 140 144 "success": "true", 141 145 }) ··· 306 310 return err 307 311 } 308 312 313 + // NOTE: *not* forwarding this request to sibling relays 314 + 309 315 return c.JSON(http.StatusOK, map[string]any{ 310 316 "success": "true", 311 317 }) ··· 337 343 // kill any active connection (there may not be one, so ignore error) 338 344 _ = s.relay.Slurper.KillUpstreamConnection(ctx, host.Hostname, false) 339 345 340 - // TODO: forward to SiblingRelayHosts 346 + // forward on to any sibling instances 347 + go s.ForwardAdminRequest(c) 348 + 341 349 return c.JSON(http.StatusOK, map[string]any{ 342 350 "success": "true", 343 351 }) ··· 366 374 } 367 375 } 368 376 369 - // TODO: forward to SiblingRelayHosts 377 + // forward on to any sibling instances 378 + go s.ForwardAdminRequest(c) 379 + 370 380 return c.JSON(http.StatusOK, map[string]any{ 371 381 "success": "true", 372 382 }) ··· 412 422 return err 413 423 } 414 424 415 - // TODO: forward to SiblingRelayHosts 425 + // forward on to any sibling instances 426 + go s.ForwardAdminRequest(c) 427 + 416 428 return c.JSON(http.StatusOK, map[string]any{ 417 429 "success": "true", 418 430 }) ··· 431 443 return err 432 444 } 433 445 434 - // TODO: forward to SiblingRelayHosts 446 + // forward on to any sibling instances 447 + go s.ForwardAdminRequest(c) 448 + 435 449 return c.JSON(http.StatusOK, map[string]any{ 436 450 "success": "true", 437 451 }) ··· 470 484 return echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to update limits: %s", err)) 471 485 } 472 486 487 + // forward on to any sibling instances 488 + go s.ForwardAdminRequest(c) 489 + 473 490 return c.JSON(http.StatusOK, map[string]any{ 474 491 "success": "true", 475 492 }) 476 493 } 494 + 495 + // this method expects to be run in a goroutine. it does not take a `context.Context`, the input `echo.Context` has likely be cancelled/closed, and does not return an error (only logs) 496 + func (s *Service) ForwardAdminRequest(c echo.Context) { 497 + 498 + if len(s.config.SiblingRelayHosts) == 0 { 499 + return 500 + } 501 + 502 + // if this request was forwarded, or user-agent matches, then don't forward 503 + req := c.Request() 504 + for _, via := range req.Header.Values("Via") { 505 + if strings.Contains(via, "atproto-relay") { 506 + s.logger.Info("not re-forwarding request to sibling relay", "header", "Via", "value", via) 507 + return 508 + } 509 + } 510 + for _, ua := range req.Header.Values("User-Agent") { 511 + if strings.Contains(ua, "atproto-relay") { 512 + s.logger.Info("not re-forwarding request to sibling relay", "header", "User-Agent", "value", ua) 513 + return 514 + } 515 + } 516 + 517 + client := http.Client{ 518 + Timeout: 10 * time.Second, 519 + } 520 + 521 + for _, rawHost := range s.config.SiblingRelayHosts { 522 + hostname, noSSL, err := relay.ParseHostname(rawHost) 523 + if err != nil { 524 + s.logger.Error("invalid sibling hostname configured", "host", rawHost, "err", err) 525 + return 526 + } 527 + u := req.URL 528 + u.Host = hostname 529 + if noSSL { 530 + u.Scheme = "http" 531 + } else { 532 + u.Scheme = "https" 533 + } 534 + upstreamReq, err := http.NewRequest(req.Method, u.String(), req.Body) 535 + if err != nil { 536 + s.logger.Error("creating admin forward request failed", "method", req.Method, "url", u.String(), "err", err) 537 + continue 538 + } 539 + 540 + // copy some headers from inbound request 541 + for k, vals := range req.Header { 542 + if strings.ToLower(k) == "accept" || strings.ToLower(k) == "authentication" { 543 + upstreamReq.Header.Add(k, vals[0]) 544 + } 545 + } 546 + upstreamReq.Header.Add("User-Agent", s.relay.Config.UserAgent) 547 + upstreamReq.Header.Add("Forwarded", "by=relay") 548 + 549 + upstreamResp, err := client.Do(upstreamReq) 550 + if err != nil { 551 + s.logger.Error("forwarded admin HTTP request failed", "method", req.Method, "url", u.String(), "err", err) 552 + continue 553 + } 554 + upstreamResp.Body.Close() 555 + if upstreamResp.StatusCode != http.StatusOK { 556 + s.logger.Error("forwarded admin HTTP request failed", "method", req.Method, "url", u.String(), "statusCode", upstreamResp.StatusCode) 557 + continue 558 + } 559 + s.logger.Info("successfully forwarded admin HTTP request", "method", req.Method, "url", u.String()) 560 + } 561 + }