this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

simple proxy of listRepos to rainbow upstream

authored by

Brian Olson and committed by
Brian Olson
eab52046 3232473a

+53 -5
+6 -5
cmd/rainbow/main.go
··· 43 43 } 44 44 45 45 app.Flags = []cli.Flag{ 46 - &cli.BoolFlag{ 47 - Name: "crawl-insecure-ws", 48 - Usage: "when connecting to PDS instances, use ws:// instead of wss://", 49 - EnvVars: []string{"RAINBOW_INSECURE_CRAWL"}, 50 - }, 46 + // TODO: unimplemented, always assumes https:// and wss:// 47 + //&cli.BoolFlag{ 48 + // Name: "crawl-insecure-ws", 49 + // Usage: "when connecting to PDS instances, use ws:// instead of wss://", 50 + // EnvVars: []string{"RAINBOW_INSECURE_CRAWL"}, 51 + //}, 51 52 &cli.StringFlag{ 52 53 Name: "splitter-host", 53 54 Value: "bsky.network",
+47
splitter/splitter.go
··· 6 6 "encoding/json" 7 7 "errors" 8 8 "fmt" 9 + "go.opentelemetry.io/otel" 9 10 "io" 10 11 "log/slog" 11 12 "math/rand" ··· 57 58 PebbleOptions *events.PebblePersistOptions 58 59 } 59 60 61 + func (sc *SplitterConfig) XrpcRootUrl() string { 62 + if strings.HasPrefix(sc.UpstreamHost, "http://") { 63 + return sc.UpstreamHost 64 + } 65 + if strings.HasPrefix(sc.UpstreamHost, "https://") { 66 + return sc.UpstreamHost 67 + } 68 + if strings.HasPrefix(sc.UpstreamHost, "ws://") { 69 + return "http://" + sc.UpstreamHost[5:] 70 + } 71 + if strings.HasPrefix(sc.UpstreamHost, "wss://") { 72 + return "https://" + sc.UpstreamHost[6:] 73 + } 74 + return "https://" + sc.UpstreamHost 75 + } 76 + 60 77 func NewSplitter(conf SplitterConfig, nextCrawlers []string) (*Splitter, error) { 61 78 var nextCrawlerURLs []*url.URL 62 79 log := slog.Default().With("system", "splitter") ··· 207 224 208 225 e.POST("/xrpc/com.atproto.sync.requestCrawl", s.RequestCrawlHandler) 209 226 e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.EventsHandler) 227 + e.GET("/xrpc/com.atproto.sync.listRepos", s.HandleComAtprotoSyncListRepos) 210 228 211 229 e.GET("/xrpc/_health", s.HandleHealthCheck) 212 230 e.GET("/_health", s.HandleHealthCheck) ··· 327 345 } 328 346 329 347 return c.JSON(200, HealthStatus{Status: "ok"}) 348 + } 349 + 350 + func (s *Splitter) HandleComAtprotoSyncListRepos(c echo.Context) error { 351 + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncListRepos") 352 + defer span.End() 353 + 354 + cursorQuery := c.QueryParam("cursor") 355 + limitQuery := c.QueryParam("limit") 356 + 357 + var err error 358 + 359 + limit := int64(500) 360 + if limitQuery != "" { 361 + limit, err = strconv.ParseInt(limitQuery, 10, 64) 362 + if err != nil || limit < 1 || limit > 1000 { 363 + return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid limit: %s", limitQuery)}) 364 + } 365 + } 366 + 367 + client := xrpc.Client{ 368 + Client: s.httpC, 369 + Host: s.conf.XrpcRootUrl(), 370 + } 371 + 372 + out, handleErr := atproto.SyncListRepos(ctx, &client, cursorQuery, limit) 373 + if handleErr != nil { 374 + return handleErr 375 + } 376 + return c.JSON(200, out) 330 377 } 331 378 332 379 func (s *Splitter) EventsHandler(c echo.Context) error {