this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

cleanup upstream url handling --splitter-host should work for any http/https/ws/wss url

authored by

Brian Olson and committed by
Brian Olson
c6370ea6 6aa979fa

+38 -11
+38 -11
splitter/splitter.go
··· 75 75 return "https://" + sc.UpstreamHost 76 76 } 77 77 78 + func (sc *SplitterConfig) UpstreamUrl() string { 79 + if strings.HasPrefix(sc.UpstreamHost, "http://") { 80 + return "http://" + sc.UpstreamHost[7:] 81 + } 82 + if strings.HasPrefix(sc.UpstreamHost, "https://") { 83 + return "https://" + sc.UpstreamHost[8:] 84 + } 85 + if strings.HasPrefix(sc.UpstreamHost, "ws://") { 86 + return sc.UpstreamHost 87 + } 88 + if strings.HasPrefix(sc.UpstreamHost, "wss://") { 89 + return sc.UpstreamHost 90 + } 91 + return "wss://" + sc.UpstreamHost 92 + } 93 + 78 94 func NewSplitter(conf SplitterConfig, nextCrawlers []string) (*Splitter, error) { 79 95 var nextCrawlerURLs []*url.URL 80 96 log := slog.Default().With("system", "splitter") ··· 88 104 } 89 105 log.Info("configuring relay for requestCrawl", "host", nextCrawlerURLs[i]) 90 106 } 107 + } 108 + 109 + _, err := url.Parse(conf.UpstreamUrl()) 110 + if err != nil { 111 + return nil, fmt.Errorf("failed to parse upstream url %#v: %w", conf.UpstreamUrl(), err) 91 112 } 92 113 93 114 s := &Splitter{ ··· 153 174 return fmt.Errorf("loading cursor failed: %w", err) 154 175 } 155 176 156 - go s.subscribeWithRedialer(context.Background(), s.conf.UpstreamHost, curs) 177 + go s.subscribeWithRedialer(context.Background(), curs) 157 178 158 179 li, err := lc.Listen(ctx, "tcp", addr) 159 180 if err != nil { ··· 570 591 return time.Second * 5 571 592 } 572 593 573 - func (s *Splitter) subscribeWithRedialer(ctx context.Context, host string, cursor int64) { 594 + func (s *Splitter) subscribeWithRedialer(ctx context.Context, cursor int64) { 574 595 d := websocket.Dialer{} 575 596 576 - protocol := "wss" 597 + upstreamUrl, err := url.Parse(s.conf.UpstreamUrl()) 598 + if err != nil { 599 + panic(err) // this should have been checked in NewSplitter 600 + } 601 + upstreamUrl = upstreamUrl.JoinPath("/xrpc/com.atproto.sync.subscribeRepos") 577 602 578 603 var backoff int 579 604 for { ··· 587 612 "User-Agent": []string{"bgs-rainbow-v0"}, 588 613 } 589 614 590 - var url string 615 + var uurl string 591 616 if cursor < 0 { 592 - url = fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos", protocol, host) 617 + upstreamUrl.RawQuery = "" 618 + uurl = upstreamUrl.String() 593 619 } else { 594 - url = fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", protocol, host, cursor) 620 + upstreamUrl.RawQuery = fmt.Sprintf("cursor=%d", cursor) 621 + uurl = upstreamUrl.String() 595 622 } 596 - con, res, err := d.DialContext(ctx, url, header) 623 + con, res, err := d.DialContext(ctx, uurl, header) 597 624 if err != nil { 598 - s.log.Warn("dialing failed", "host", host, "err", err, "backoff", backoff) 625 + s.log.Warn("dialing failed", "host", uurl, "err", err, "backoff", backoff) 599 626 time.Sleep(sleepForBackoff(backoff)) 600 627 backoff++ 601 628 ··· 604 631 605 632 s.log.Info("event subscription response", "code", res.StatusCode) 606 633 607 - if err := s.handleConnection(ctx, host, con, &cursor); err != nil { 608 - s.log.Warn("connection failed", "host", host, "err", err) 634 + if err := s.handleConnection(ctx, con, &cursor); err != nil { 635 + s.log.Warn("connection failed", "host", uurl, "err", err) 609 636 } 610 637 } 611 638 } 612 639 613 - func (s *Splitter) handleConnection(ctx context.Context, host string, con *websocket.Conn, lastCursor *int64) error { 640 + func (s *Splitter) handleConnection(ctx context.Context, con *websocket.Conn, lastCursor *int64) error { 614 641 ctx, cancel := context.WithCancel(ctx) 615 642 defer cancel() 616 643