this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Handle requestCrawl fanout in Rainbow

authored by

Jaz Volpert and committed by
Brian Olson
8bd13ea6 b1252485

+141 -39
+12 -6
cmd/rainbow/main.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "github.com/bluesky-social/indigo/events" 6 5 "log/slog" 7 6 _ "net/http/pprof" 8 7 "os" ··· 10 9 "syscall" 11 10 "time" 12 11 12 + "github.com/bluesky-social/indigo/events" 13 13 "github.com/bluesky-social/indigo/splitter" 14 14 15 + "github.com/carlmjohnson/versioninfo" 15 16 _ "github.com/joho/godotenv/autoload" 16 - _ "go.uber.org/automaxprocs" 17 - 18 - "github.com/carlmjohnson/versioninfo" 19 17 "github.com/urfave/cli/v2" 20 18 "go.opentelemetry.io/otel" 21 19 "go.opentelemetry.io/otel/attribute" ··· 23 21 "go.opentelemetry.io/otel/sdk/resource" 24 22 tracesdk "go.opentelemetry.io/otel/sdk/trace" 25 23 semconv "go.opentelemetry.io/otel/semconv/v1.4.0" 24 + _ "go.uber.org/automaxprocs" 26 25 ) 27 26 28 27 var log = slog.Default().With("system", "rainbow") ··· 88 87 Usage: "max bytes target for event cache, 0 to disable size target trimming", 89 88 EnvVars: []string{"RAINBOW_PERSIST_BYTES", "SPLITTER_PERSIST_BYTES"}, 90 89 }, 90 + &cli.StringSliceFlag{ 91 + Name: "next-crawler", 92 + Usage: "forward POST requestCrawl to this url, should be machine root url and not xrpc/requestCrawl, comma separated list", 93 + EnvVars: []string{"RELAY_NEXT_CRAWLER"}, 94 + }, 91 95 } 92 96 93 97 // TODO: slog.SetDefault and set module `var log *slog.Logger` based on flags and env ··· 143 147 144 148 persistPath := cctx.String("persist-db") 145 149 upstreamHost := cctx.String("splitter-host") 150 + nextCrawlers := cctx.StringSlice("next-crawler") 151 + 146 152 var spl *splitter.Splitter 147 153 var err error 148 154 if persistPath != "" { ··· 158 164 CursorFile: cctx.String("cursor-file"), 159 165 PebbleOptions: &ppopts, 160 166 } 161 - spl, err = splitter.NewSplitter(conf) 167 + spl, err = splitter.NewSplitter(conf, nextCrawlers) 162 168 } else { 163 169 log.Info("building in-memory splitter") 164 170 conf := splitter.SplitterConfig{ 165 171 UpstreamHost: upstreamHost, 166 172 CursorFile: cctx.String("cursor-file"), 167 173 } 168 - spl, err = splitter.NewSplitter(conf) 174 + spl, err = splitter.NewSplitter(conf, nextCrawlers) 169 175 } 170 176 if err != nil { 171 177 log.Error("failed to create splitter", "path", persistPath, "error", err)
+5
indexer/metrics.go
··· 44 44 Name: "indexer_catchup_repos", 45 45 Help: "Number of repos waiting on catchup", 46 46 }) 47 + 48 + var usersAddedToCatchupQueue = promauto.NewCounter(prometheus.CounterOpts{ 49 + Name: "indexer_users_added_to_catchup_queue", 50 + Help: "Number of users added to catchup queue", 51 + })
+124 -33
splitter/splitter.go
··· 1 1 package splitter 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 6 + "encoding/json" 5 7 "errors" 6 8 "fmt" 7 9 "io" ··· 9 11 "math/rand" 10 12 "net" 11 13 "net/http" 14 + "net/url" 12 15 "os" 13 16 "strconv" 14 17 "strings" 15 18 "sync" 16 19 "time" 17 20 21 + "github.com/bluesky-social/indigo/api/atproto" 22 + comatproto "github.com/bluesky-social/indigo/api/atproto" 18 23 "github.com/bluesky-social/indigo/bgs" 19 24 events "github.com/bluesky-social/indigo/events" 20 25 "github.com/bluesky-social/indigo/events/schedulers/sequential" 26 + "github.com/bluesky-social/indigo/util" 27 + "github.com/bluesky-social/indigo/xrpc" 21 28 "github.com/gorilla/websocket" 22 29 "github.com/labstack/echo/v4" 23 30 "github.com/labstack/echo/v4/middleware" ··· 39 46 conf SplitterConfig 40 47 41 48 log *slog.Logger 49 + 50 + httpC *http.Client 51 + nextCrawlers []*url.URL 42 52 } 43 53 44 54 type SplitterConfig struct { ··· 47 57 PebbleOptions *events.PebblePersistOptions 48 58 } 49 59 50 - func NewMemSplitter(host string) *Splitter { 51 - conf := SplitterConfig{ 52 - UpstreamHost: host, 53 - CursorFile: "cursor-file", 60 + func NewSplitter(conf SplitterConfig, nextCrawlers []string) (*Splitter, error) { 61 + var nextCrawlerURLs []*url.URL 62 + log := slog.Default().With("system", "splitter") 63 + if len(nextCrawlers) > 0 { 64 + nextCrawlerURLs = make([]*url.URL, len(nextCrawlers)) 65 + for i, tu := range nextCrawlers { 66 + var err error 67 + nextCrawlerURLs[i], err = url.Parse(tu) 68 + if err != nil { 69 + return nil, fmt.Errorf("failed to parse next-crawler url: %w", err) 70 + } 71 + log.Info("configuring relay for requestCrawl", "host", nextCrawlerURLs[i]) 72 + } 54 73 } 55 74 56 - erb := NewEventRingBuffer(20_000, 10_000) 75 + s := &Splitter{ 76 + conf: conf, 77 + consumers: make(map[uint64]*SocketConsumer), 78 + log: log, 79 + httpC: util.RobustHTTPClient(), 80 + nextCrawlers: nextCrawlerURLs, 81 + } 57 82 58 - em := events.NewEventManager(erb) 59 - return &Splitter{ 60 - conf: conf, 61 - erb: erb, 62 - events: em, 63 - consumers: make(map[uint64]*SocketConsumer), 64 - log: slog.Default().With("system", "splitter"), 65 - } 66 - } 67 - func NewSplitter(conf SplitterConfig) (*Splitter, error) { 68 83 if conf.PebbleOptions == nil { 69 84 // mem splitter 70 85 erb := NewEventRingBuffer(20_000, 10_000) 71 - 72 - em := events.NewEventManager(erb) 73 - return &Splitter{ 74 - conf: conf, 75 - erb: erb, 76 - events: em, 77 - consumers: make(map[uint64]*SocketConsumer), 78 - log: slog.Default().With("system", "splitter"), 79 - }, nil 86 + s.erb = erb 87 + s.events = events.NewEventManager(erb) 80 88 } else { 81 89 pp, err := events.NewPebblePersistance(conf.PebbleOptions) 82 90 if err != nil { 83 91 return nil, err 84 92 } 85 - 86 93 go pp.GCThread(context.Background()) 87 - em := events.NewEventManager(pp) 88 - return &Splitter{ 89 - conf: conf, 90 - pp: pp, 91 - events: em, 92 - consumers: make(map[uint64]*SocketConsumer), 93 - log: slog.Default().With("system", "splitter"), 94 - }, nil 94 + s.pp = pp 95 + s.events = events.NewEventManager(pp) 95 96 } 97 + 98 + return s, nil 96 99 } 97 100 func NewDiskSplitter(host, path string, persistHours float64, maxBytes int64) (*Splitter, error) { 98 101 ppopts := events.PebblePersistOptions{ ··· 200 203 } 201 204 } 202 205 206 + // TODO: this API is temporary until we formalize what we want here 207 + 208 + e.GET("/xrpc/com.atproto.sync.requestCrawl", s.RequestCrawlHandler) 203 209 e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.EventsHandler) 204 210 205 211 e.GET("/xrpc/_health", s.HandleHealthCheck) ··· 236 242 237 243 func (s *Splitter) HandleHomeMessage(c echo.Context) error { 238 244 return c.String(http.StatusOK, homeMessage) 245 + } 246 + 247 + type XRPCError struct { 248 + Message string `json:"message"` 249 + } 250 + 251 + func (s *Splitter) RequestCrawlHandler(c echo.Context) error { 252 + ctx := c.Request().Context() 253 + var body comatproto.SyncRequestCrawl_Input 254 + if err := c.Bind(&body); err != nil { 255 + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid body: %s", err)}) 256 + } 257 + 258 + host := body.Hostname 259 + if host == "" { 260 + return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname") 261 + } 262 + 263 + if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") { 264 + host = "https://" + host 265 + } 266 + 267 + u, err := url.Parse(host) 268 + if err != nil { 269 + return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname") 270 + } 271 + 272 + if u.Scheme == "http" { 273 + return echo.NewHTTPError(http.StatusBadRequest, "this server requires https") 274 + } 275 + if u.Path != "" { 276 + return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path") 277 + } 278 + 279 + if u.Query().Encode() != "" { 280 + return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without query") 281 + } 282 + 283 + host = u.Host // potentially hostname:port 284 + 285 + clientHost := fmt.Sprintf("%s://%s", u.Scheme, host) 286 + 287 + xrpcC := &xrpc.Client{ 288 + Host: clientHost, 289 + Client: http.DefaultClient, // not using the client that auto-retries 290 + } 291 + 292 + desc, err := atproto.ServerDescribeServer(ctx, xrpcC) 293 + if err != nil { 294 + errMsg := fmt.Sprintf("requested host (%s) failed to respond to describe request", clientHost) 295 + return echo.NewHTTPError(http.StatusBadRequest, errMsg) 296 + } 297 + 298 + // Maybe we could do something with this response later 299 + _ = desc 300 + 301 + if len(s.nextCrawlers) != 0 { 302 + blob, err := json.Marshal(body) 303 + if err != nil { 304 + s.log.Warn("could not forward requestCrawl, json err", "err", err) 305 + } else { 306 + go func(bodyBlob []byte) { 307 + for _, remote := range s.nextCrawlers { 308 + if remote == nil { 309 + continue 310 + } 311 + 312 + pu := remote.JoinPath("/xrpc/com.atproto.sync.requestCrawl") 313 + response, err := s.httpC.Post(pu.String(), "application/json", bytes.NewReader(bodyBlob)) 314 + if response != nil && response.Body != nil { 315 + response.Body.Close() 316 + } 317 + if err != nil || response == nil { 318 + s.log.Warn("requestCrawl forward failed", "host", remote, "err", err) 319 + } else if response.StatusCode != http.StatusOK { 320 + s.log.Warn("requestCrawl forward failed", "host", remote, "status", response.Status) 321 + } else { 322 + s.log.Info("requestCrawl forward successful", "host", remote) 323 + } 324 + } 325 + }(blob) 326 + } 327 + } 328 + 329 + return c.JSON(200, HealthStatus{Status: "ok"}) 239 330 } 240 331 241 332 func (s *Splitter) EventsHandler(c echo.Context) error {