this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

auth tokens misc cleanups and developer tweaks

+66 -26
+18 -14
cmd/rainbow/main.go
··· 99 99 EnvVars: []string{"RAINBOW_RC_ERR_LIMIT"}, 100 100 Value: 10, 101 101 }, 102 + &cli.StringSliceFlag{ 103 + Name: "auth-tokens", 104 + Usage: "strings to find in Authorization: HTTP header to allow requestRequestCrawl (comma separated list)", 105 + EnvVars: []string{"RAINBOW_AUTH_TOKENS"}, 106 + }, 107 + &cli.BoolFlag{ 108 + Name: "skip-request-crawl-ping", 109 + Usage: "development flag to not bother the world with development", 110 + }, 102 111 } 103 112 104 113 // TODO: slog.SetDefault and set module `var log *slog.Logger` based on flags and env ··· 158 167 159 168 var spl *splitter.Splitter 160 169 var err error 170 + conf := splitter.SplitterConfig{ 171 + UpstreamHost: upstreamHost, 172 + CursorFile: cctx.String("cursor-file"), 173 + MaxRequestCrawlForwardErrors: cctx.Int("max-request-crawl-errors"), 174 + AuthTokens: cctx.StringSlice("auth-tokens"), 175 + SkipRequestCrawlPing: cctx.Bool("skip-request-crawl-ping"), 176 + } 161 177 if persistPath != "" { 162 178 log.Info("building splitter with storage at", "path", persistPath) 163 - ppopts := pebblepersist.PebblePersistOptions{ 179 + conf.PebbleOptions = &pebblepersist.PebblePersistOptions{ 164 180 DbPath: persistPath, 165 181 PersistDuration: time.Duration(float64(time.Hour) * cctx.Float64("persist-hours")), 166 182 GCPeriod: 5 * time.Minute, 167 183 MaxBytes: uint64(cctx.Int64("persist-bytes")), 168 184 } 169 - conf := splitter.SplitterConfig{ 170 - UpstreamHost: upstreamHost, 171 - CursorFile: cctx.String("cursor-file"), 172 - PebbleOptions: &ppopts, 173 - MaxRequestCrawlForwardErrors: cctx.Int("max-request-crawl-errors"), 174 - } 175 - spl, err = splitter.NewSplitter(conf, nextCrawlers) 176 185 } else { 177 186 log.Info("building in-memory splitter") 178 - conf := splitter.SplitterConfig{ 179 - UpstreamHost: upstreamHost, 180 - CursorFile: cctx.String("cursor-file"), 181 - MaxRequestCrawlForwardErrors: cctx.Int("max-request-crawl-errors"), 182 - } 183 - spl, err = splitter.NewSplitter(conf, nextCrawlers) 184 187 } 188 + spl, err = splitter.NewSplitter(conf, nextCrawlers) 185 189 if err != nil { 186 190 log.Error("failed to create splitter", "path", persistPath, "error", err) 187 191 os.Exit(1)
+48 -12
splitter/splitter.go
··· 71 71 PebbleOptions *pebblepersist.PebblePersistOptions 72 72 73 73 MaxRequestCrawlForwardErrors int 74 + 75 + AuthTokens []string 76 + 77 + SkipRequestCrawlPing bool 74 78 } 75 79 76 80 func (sc *SplitterConfig) XrpcRootUrl() string { ··· 352 356 353 357 clientHost := fmt.Sprintf("%s://%s", u.Scheme, host) 354 358 355 - xrpcC := &xrpc.Client{ 356 - Host: clientHost, 357 - Client: http.DefaultClient, // not using the client that auto-retries 358 - } 359 + if s.conf.SkipRequestCrawlPing { 360 + s.log.Warn("development mode, skipping pds describeServer ping") 361 + } else { 362 + xrpcC := &xrpc.Client{ 363 + Host: clientHost, 364 + Client: http.DefaultClient, // not using the client that auto-retries 365 + } 359 366 360 - desc, err := atproto.ServerDescribeServer(ctx, xrpcC) 361 - if err != nil { 362 - errMsg := fmt.Sprintf("requested host (%s) failed to respond to describe request", clientHost) 363 - return echo.NewHTTPError(http.StatusBadRequest, errMsg) 367 + desc, err := atproto.ServerDescribeServer(ctx, xrpcC) 368 + if err != nil { 369 + errMsg := fmt.Sprintf("requested host (%s) failed to respond to describe request", clientHost) 370 + return echo.NewHTTPError(http.StatusBadRequest, errMsg) 371 + } 372 + 373 + // Maybe we could do something with this response later 374 + _ = desc 364 375 } 365 - 366 - // Maybe we could do something with this response later 367 - _ = desc 368 376 369 377 bodyBlob, err := json.Marshal(body) 370 378 if err != nil { ··· 399 407 // POST /v1/requestRequestCrawl body={"url":""} 400 408 // URL path will be replaced with /xrpc/com.atproto.sync.requestCrawl to receieve forwarded requestCrawl messages 401 409 func (s *Splitter) RequestRequestCrawlHandler(c echo.Context) error { 410 + authHeader := c.Request().Header.Get("Authorization") 411 + authorized := false 412 + for _, token := range s.conf.AuthTokens { 413 + if strings.Contains(authHeader, token) { 414 + authorized = true 415 + break 416 + } 417 + } 418 + if !authorized { 419 + return c.JSON(http.StatusUnauthorized, MessageResponse{Message: "nope"}) 420 + } 402 421 var requestBody RequestRequestCrawlRequest 403 422 err := c.Bind(&requestBody) 404 423 if err != nil { ··· 722 741 } 723 742 } 724 743 744 + if s.conf.CursorFile == "" { 745 + return -1, nil 746 + } 725 747 fi, err := os.Open(s.conf.CursorFile) 726 748 if err != nil { 727 749 if os.IsNotExist(err) { ··· 744 766 } 745 767 746 768 func (s *Splitter) writeCursor(curs int64) error { 769 + if s.conf.CursorFile == "" { 770 + return nil 771 + } 747 772 return os.WriteFile(s.conf.CursorFile, []byte(fmt.Sprint(curs)), 0664) 748 773 } 749 774 ··· 764 789 765 790 var ErrDuplicateClient = errors.New("duplicate client") 766 791 792 + var requestCrawlPath *url.URL 793 + 794 + func init() { 795 + var err error 796 + requestCrawlPath, err = url.Parse("/xrpc/com.atproto.sync.requestCrawl") 797 + if err != nil { 798 + panic(err) 799 + } 800 + } 801 + 767 802 func (s *Splitter) newRCClient(baseUrl string, permanent bool) error { 768 803 xu, err := url.Parse(baseUrl) 769 804 if err != nil { ··· 771 806 } 772 807 ctx, cancel := context.WithCancel(s.ctx) 773 808 rcc := &rcClient{ 774 - requestCrawlUrl: xu.JoinPath("/xrpc/com.atproto.sync.requestCrawl").String(), 809 + requestCrawlUrl: xu.ResolveReference(requestCrawlPath).String(), 775 810 postBodies: make(chan []byte, 100), 776 811 log: s.log, 777 812 splitter: s, ··· 787 822 } 788 823 } 789 824 s.requestCrawlClients = append(s.requestCrawlClients, rcc) 825 + s.log.Info("new rcc", "url", rcc.requestCrawlUrl) 790 826 go rcc.forwarderThread() 791 827 return nil 792 828 }