this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

rainbow: add proxying to collectiondir

+50 -23
+21 -10
cmd/rainbow/main.go
··· 52 52 &cli.StringFlag{ 53 53 Name: "upstream-host", 54 54 Value: "http://localhost:2470", 55 - Usage: "simple hostname (no URI scheme) of the upstream host (eg, relay)", 55 + Usage: "URL (schema and hostname, no path) of the upstream host (eg, relay)", 56 56 EnvVars: []string{"ATP_RELAY_HOST", "RAINBOW_RELAY_HOST"}, 57 57 }, 58 58 &cli.StringFlag{ ··· 90 90 EnvVars: []string{"RAINBOW_PERSIST_BYTES", "SPLITTER_PERSIST_BYTES"}, 91 91 }, 92 92 &cli.StringSliceFlag{ 93 + // TODO: better name for this argument 93 94 Name: "next-crawler", 94 - Usage: "forward POST requestCrawl to this url, should be machine root url and not xrpc/requestCrawl, comma separated list", 95 + Usage: "forward POST requestCrawl to these hosts (schema and host, no path) in addition to upstream-host. Comma-separated or multiple flags", 95 96 EnvVars: []string{"RAINBOW_NEXT_CRAWLER", "RELAY_NEXT_CRAWLER"}, 96 97 }, 97 98 &cli.StringFlag{ 99 + Name: "collectiondir-host", 100 + Value: "http://localhost:2510", 101 + Usage: "host (schema and hostname, no path) of upstream collectiondir instance, for com.atproto.sync.listReposByCollection", 102 + EnvVars: []string{"RAINBOW_COLLECTIONDIR_HOST"}, 103 + }, 104 + &cli.StringFlag{ 98 105 Name: "env", 99 106 Usage: "operating environment (eg, 'prod', 'test')", 100 107 Value: "dev", 101 108 EnvVars: []string{"ENVIRONMENT"}, 102 109 }, 103 110 &cli.BoolFlag{ 104 - Name: "enable-otel-otlp", 105 - Usage: "enables OTEL OTLP exporter endpoint", 111 + Name: "enable-otel-otlp", 112 + Usage: "enables OTEL OTLP exporter endpoint", 113 + EnvVars: []string{"RAINBOW_ENABLE_OTEL_OTLP", "ENABLE_OTEL_OTLP"}, 106 114 }, 107 115 &cli.StringFlag{ 108 116 Name: "otel-otlp-endpoint", ··· 160 168 161 169 persistPath := cctx.String("persist-db") 162 170 upstreamHost := cctx.String("upstream-host") 171 + collectionDirHost := cctx.String("collectiondir-host") 163 172 nextCrawlers := cctx.StringSlice("next-crawler") 164 173 165 174 var spl *splitter.Splitter ··· 173 182 MaxBytes: uint64(cctx.Int64("persist-bytes")), 174 183 } 175 184 conf := splitter.SplitterConfig{ 176 - UpstreamHost: upstreamHost, 177 - CursorFile: cctx.String("cursor-file"), 178 - PebbleOptions: &ppopts, 179 - UserAgent: fmt.Sprintf("rainbow/%s", versioninfo.Short()), 185 + UpstreamHost: upstreamHost, 186 + CollectionDirHost: collectionDirHost, 187 + CursorFile: cctx.String("cursor-file"), 188 + PebbleOptions: &ppopts, 189 + UserAgent: fmt.Sprintf("rainbow/%s", versioninfo.Short()), 180 190 } 181 191 spl, err = splitter.NewSplitter(conf, nextCrawlers) 182 192 } else { 183 193 logger.Info("building in-memory splitter") 184 194 conf := splitter.SplitterConfig{ 185 - UpstreamHost: upstreamHost, 186 - CursorFile: cctx.String("cursor-file"), 195 + UpstreamHost: upstreamHost, 196 + CollectionDirHost: collectionDirHost, 197 + CursorFile: cctx.String("cursor-file"), 187 198 } 188 199 spl, err = splitter.NewSplitter(conf, nextCrawlers) 189 200 }
+20 -7
splitter/handlers.go
··· 84 84 85 85 // Proxies a request to the single upstream (relay) 86 86 func (s *Splitter) ProxyRequestUpstream(c echo.Context) error { 87 + u, err := url.Parse(s.conf.XrpcRootUrl()) 88 + if err != nil { 89 + return err 90 + } 91 + return s.ProxyRequest(c, u.Host, u.Scheme) 92 + } 93 + 94 + // Proxies a request to the collectiondir 95 + func (s *Splitter) ProxyRequestCollectionDir(c echo.Context) error { 96 + u, err := url.Parse(s.conf.CollectionDirHost) 97 + if err != nil { 98 + return err 99 + } 100 + return s.ProxyRequest(c, u.Host, u.Scheme) 101 + } 102 + 103 + func (s *Splitter) ProxyRequest(c echo.Context, hostname, scheme string) error { 87 104 88 105 req := c.Request() 89 106 respWriter := c.Response() 90 107 91 108 u := req.URL 92 - usu, err := url.Parse(s.conf.XrpcRootUrl()) 93 - if err != nil { 94 - return err 95 - } 96 - u.Scheme = usu.Scheme 97 - u.Host = usu.Host 98 - upstreamReq, err := http.NewRequest(req.Method, req.URL.String(), req.Body) 109 + u.Scheme = scheme 110 + u.Host = hostname 111 + upstreamReq, err := http.NewRequest(req.Method, u.String(), req.Body) 99 112 if err != nil { 100 113 s.logger.Warn("proxy request failed", "err", err) 101 114 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: "failed to proxy to upstream relay"})
+9 -6
splitter/splitter.go
··· 47 47 } 48 48 49 49 type SplitterConfig struct { 50 - UpstreamHost string 51 - CursorFile string 52 - UserAgent string 53 - PebbleOptions *pebblepersist.PebblePersistOptions 54 - Logger *slog.Logger 50 + UpstreamHost string 51 + CollectionDirHost string 52 + CursorFile string 53 + UserAgent string 54 + PebbleOptions *pebblepersist.PebblePersistOptions 55 + Logger *slog.Logger 55 56 } 56 57 57 58 func (sc *SplitterConfig) XrpcRootUrl() string { ··· 197 198 e.GET("/xrpc/com.atproto.sync.listHosts", s.ProxyRequestUpstream) 198 199 e.GET("/xrpc/com.atproto.sync.getHostStatus", s.ProxyRequestUpstream) 199 200 e.GET("/xrpc/com.atproto.sync.getRepo", s.ProxyRequestUpstream) 200 - // TODO: proxy listReposByCollection to a different host? 201 + 202 + // proxy endpoint to collectiondir 203 + e.GET("/xrpc/com.atproto.sync.listReposByCollection", s.ProxyRequestCollectionDir) 201 204 202 205 e.GET("/xrpc/_health", s.HandleHealthCheck) 203 206 e.GET("/_health", s.HandleHealthCheck)