this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

rainbow: small refactors; improve upstream proxying and requestCrawl forwarding

+416 -431
+4 -2
cmd/rainbow/main.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "fmt" 5 6 "log/slog" 6 7 "os" 7 8 "os/signal" ··· 50 51 }, 51 52 &cli.StringFlag{ 52 53 Name: "upstream-host", 53 - Value: "bsky.network", 54 + Value: "http://localhost:2470", 54 55 Usage: "simple hostname (no URI scheme) of the upstream host (eg, relay)", 55 56 EnvVars: []string{"ATP_RELAY_HOST", "RAINBOW_RELAY_HOST"}, 56 57 }, ··· 175 176 UpstreamHost: upstreamHost, 176 177 CursorFile: cctx.String("cursor-file"), 177 178 PebbleOptions: &ppopts, 179 + UserAgent: fmt.Sprintf("rainbow/%s", versioninfo.Short()), 178 180 } 179 181 spl, err = splitter.NewSplitter(conf, nextCrawlers) 180 182 } else { ··· 203 205 runErr := make(chan error, 1) 204 206 205 207 go func() { 206 - err := spl.Start(cctx.String("api-listen")) 208 + err := spl.StartAPI(cctx.String("api-listen")) 207 209 runErr <- err 208 210 }() 209 211
+198
splitter/firehose.go
··· 1 + package splitter 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "net" 7 + "strconv" 8 + "sync" 9 + "time" 10 + 11 + "github.com/bluesky-social/indigo/events" 12 + 13 + "github.com/gorilla/websocket" 14 + "github.com/labstack/echo/v4" 15 + "github.com/prometheus/client_golang/prometheus" 16 + dto "github.com/prometheus/client_model/go" 17 + ) 18 + 19 + func (s *Splitter) HandleSubscribeRepos(c echo.Context) error { 20 + var since *int64 21 + if sinceVal := c.QueryParam("cursor"); sinceVal != "" { 22 + sval, err := strconv.ParseInt(sinceVal, 10, 64) 23 + if err != nil { 24 + return err 25 + } 26 + since = &sval 27 + } 28 + 29 + ctx, cancel := context.WithCancel(c.Request().Context()) 30 + defer cancel() 31 + 32 + // TODO: authhhh 33 + conn, err := websocket.Upgrade(c.Response(), c.Request(), c.Response().Header(), 10<<10, 10<<10) 34 + if err != nil { 35 + return fmt.Errorf("upgrading websocket: %w", err) 36 + } 37 + 38 + defer conn.Close() 39 + 40 + lastWriteLk := sync.Mutex{} 41 + lastWrite := time.Now() 42 + 43 + // Start a goroutine to ping the client every 30 seconds to check if it's 44 + // still alive. If the client doesn't respond to a ping within 5 seconds, 45 + // we'll close the connection and teardown the consumer. 46 + go func() { 47 + ticker := time.NewTicker(30 * time.Second) 48 + defer ticker.Stop() 49 + 50 + for { 51 + select { 52 + case <-ticker.C: 53 + lastWriteLk.Lock() 54 + lw := lastWrite 55 + lastWriteLk.Unlock() 56 + 57 + if time.Since(lw) < 30*time.Second { 58 + continue 59 + } 60 + 61 + if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)); err != nil { 62 + s.logger.Error("failed to ping client", "err", err) 63 + cancel() 64 + return 65 + } 66 + case <-ctx.Done(): 67 + return 68 + } 69 + } 70 + }() 71 + 72 + conn.SetPingHandler(func(message string) error { 73 + err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second*60)) 74 + if err == websocket.ErrCloseSent { 75 + return nil 76 + } else if e, ok := err.(net.Error); ok && e.Temporary() { 77 + return nil 78 + } 79 + return err 80 + }) 81 + 82 + // Start a goroutine to read messages from the client and discard them. 83 + go func() { 84 + for { 85 + _, _, err := conn.ReadMessage() 86 + if err != nil { 87 + s.logger.Error("failed to read message from client", "err", err) 88 + cancel() 89 + return 90 + } 91 + } 92 + }() 93 + 94 + ident := c.RealIP() + "-" + c.Request().UserAgent() 95 + 96 + evts, cleanup, err := s.events.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { return true }, since) 97 + if err != nil { 98 + return err 99 + } 100 + defer cleanup() 101 + 102 + // Keep track of the consumer for metrics and admin endpoints 103 + consumer := SocketConsumer{ 104 + RemoteAddr: c.RealIP(), 105 + UserAgent: c.Request().UserAgent(), 106 + ConnectedAt: time.Now(), 107 + } 108 + sentCounter := eventsSentCounter.WithLabelValues(consumer.RemoteAddr, consumer.UserAgent) 109 + consumer.EventsSent = sentCounter 110 + 111 + consumerID := s.registerConsumer(&consumer) 112 + defer s.cleanupConsumer(consumerID) 113 + 114 + s.logger.Info("new consumer", 115 + "remote_addr", consumer.RemoteAddr, 116 + "user_agent", consumer.UserAgent, 117 + "cursor", since, 118 + "consumer_id", consumerID, 119 + ) 120 + activeClientGauge.Inc() 121 + defer activeClientGauge.Dec() 122 + 123 + for { 124 + select { 125 + case evt, ok := <-evts: 126 + if !ok { 127 + s.logger.Error("event stream closed unexpectedly") 128 + return nil 129 + } 130 + 131 + wc, err := conn.NextWriter(websocket.BinaryMessage) 132 + if err != nil { 133 + s.logger.Error("failed to get next writer", "err", err) 134 + return err 135 + } 136 + 137 + if evt.Preserialized != nil { 138 + _, err = wc.Write(evt.Preserialized) 139 + } else { 140 + err = evt.Serialize(wc) 141 + } 142 + if err != nil { 143 + return fmt.Errorf("failed to write event: %w", err) 144 + } 145 + 146 + if err := wc.Close(); err != nil { 147 + s.logger.Warn("failed to flush-close our event write", "err", err) 148 + return nil 149 + } 150 + 151 + lastWriteLk.Lock() 152 + lastWrite = time.Now() 153 + lastWriteLk.Unlock() 154 + sentCounter.Inc() 155 + case <-ctx.Done(): 156 + return nil 157 + } 158 + } 159 + } 160 + 161 + type SocketConsumer struct { 162 + UserAgent string 163 + RemoteAddr string 164 + ConnectedAt time.Time 165 + EventsSent prometheus.Counter 166 + } 167 + 168 + func (s *Splitter) registerConsumer(c *SocketConsumer) uint64 { 169 + s.consumersLk.Lock() 170 + defer s.consumersLk.Unlock() 171 + 172 + id := s.nextConsumerID 173 + s.nextConsumerID++ 174 + 175 + s.consumers[id] = c 176 + 177 + return id 178 + } 179 + 180 + func (s *Splitter) cleanupConsumer(id uint64) { 181 + s.consumersLk.Lock() 182 + defer s.consumersLk.Unlock() 183 + 184 + c := s.consumers[id] 185 + 186 + var m = &dto.Metric{} 187 + if err := c.EventsSent.Write(m); err != nil { 188 + s.logger.Error("failed to get sent counter", "err", err) 189 + } 190 + 191 + s.logger.Info("consumer disconnected", 192 + "consumer_id", id, 193 + "remote_addr", c.RemoteAddr, 194 + "user_agent", c.UserAgent, 195 + "events_sent", m.Counter.GetValue()) 196 + 197 + delete(s.consumers, id) 198 + }
+125
splitter/handlers.go
··· 1 + package splitter 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "io" 7 + "net/http" 8 + "net/url" 9 + "strings" 10 + 11 + comatproto "github.com/bluesky-social/indigo/api/atproto" 12 + "github.com/bluesky-social/indigo/xrpc" 13 + 14 + "github.com/labstack/echo/v4" 15 + ) 16 + 17 + type HealthStatus struct { 18 + Service string `json:"service,const=rainbow"` 19 + Status string `json:"status"` 20 + Message string `json:"msg,omitempty"` 21 + } 22 + 23 + func (s *Splitter) HandleHealthCheck(c echo.Context) error { 24 + return c.JSON(http.StatusOK, HealthStatus{Status: "ok"}) 25 + } 26 + 27 + var homeMessage string = ` 28 + _ _ 29 + _ _ __ _(_)_ _ | |__ _____ __ __ 30 + | '_/ _' | | ' \| '_ \/ _ \ V V / 31 + |_| \__,_|_|_||_|_.__/\___/\_/\_/ 32 + 33 + This is an atproto [https://atproto.com] firehose fanout service, running the 'rainbow' codebase [https://github.com/bluesky-social/indigo] 34 + 35 + The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos 36 + ` 37 + 38 + func (s *Splitter) HandleHomeMessage(c echo.Context) error { 39 + return c.String(http.StatusOK, homeMessage) 40 + } 41 + 42 + func (s *Splitter) HandleComAtprotoSyncRequestCrawl(c echo.Context) error { 43 + ctx := c.Request().Context() 44 + var body comatproto.SyncRequestCrawl_Input 45 + if err := c.Bind(&body); err != nil { 46 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("invalid body: %s", err)}) 47 + } 48 + if body.Hostname == "" { 49 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: "must include a hostname"}) 50 + } 51 + 52 + // first forward to the upstream 53 + xrpcc := xrpc.Client{ 54 + Client: s.upstreamClient, 55 + Host: s.conf.XrpcRootUrl(), 56 + } 57 + 58 + err := comatproto.SyncRequestCrawl(ctx, &xrpcc, &body) 59 + if err != nil { 60 + httpError, ok := err.(*xrpc.Error) 61 + if ok { 62 + return c.JSON(httpError.StatusCode, xrpc.XRPCError{ErrStr: "UpstreamError", Message: fmt.Sprintf("%s", httpError.Wrapped)}) 63 + } 64 + return c.JSON(http.StatusInternalServerError, xrpc.XRPCError{ErrStr: "ProxyRequestFailed", Message: fmt.Sprintf("failed forwarding request: %s", err)}) 65 + } 66 + 67 + // if that was successful, then forward on to the other upstreams (in goroutines) 68 + for _, c := range s.nextCrawlers { 69 + go func() { 70 + ctx := context.Background() 71 + xrpcc := xrpc.Client{ 72 + Client: s.upstreamClient, 73 + Host: c.String(), 74 + } 75 + if err := comatproto.SyncRequestCrawl(ctx, &xrpcc, &body); err != nil { 76 + s.logger.Warn("failed to forward requestCrawl", "upstream", c.String(), "targetHost", body.Hostname, "err", err) 77 + } 78 + s.logger.Info("successfully forwarded requestCrawl", "upstream", c.String(), "targetHost", body.Hostname) 79 + }() 80 + } 81 + 82 + return c.JSON(http.StatusOK, map[string]any{"success": true}) 83 + } 84 + 85 + // Proxies a request to the single upstream (relay) 86 + func (s *Splitter) ProxyRequestUpstream(c echo.Context) error { 87 + 88 + req := c.Request() 89 + respWriter := c.Response() 90 + 91 + u := req.URL 92 + usu, err := url.Parse(s.conf.XrpcRootUrl()) 93 + if err != nil { 94 + return err 95 + } 96 + u.Scheme = usu.Scheme 97 + u.Host = usu.Host 98 + upstreamReq, err := http.NewRequest(req.Method, req.URL.String(), req.Body) 99 + if err != nil { 100 + s.logger.Warn("proxy request failed", "err", err) 101 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: "failed to proxy to upstream relay"}) 102 + } 103 + 104 + for k, vals := range req.Header { 105 + if strings.ToLower(k) == "accept" { 106 + upstreamReq.Header.Add(k, vals[0]) 107 + } 108 + } 109 + 110 + upstreamResp, err := s.upstreamClient.Do(upstreamReq) 111 + if err != nil { 112 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: "failed to proxy to upstream relay"}) 113 + } 114 + defer upstreamResp.Body.Close() 115 + 116 + respWriter.Header()["Content-Type"] = []string{upstreamResp.Header.Get("Content-Type")} 117 + respWriter.WriteHeader(upstreamResp.StatusCode) 118 + 119 + _, err = io.Copy(respWriter, upstreamResp.Body) 120 + if err != nil { 121 + s.logger.Error("error copying proxy body", "err", err) 122 + } 123 + 124 + return nil 125 + }
+89 -429
splitter/splitter.go
··· 1 1 package splitter 2 2 3 3 import ( 4 - "bytes" 5 4 "context" 6 - "encoding/json" 7 5 "errors" 8 6 "fmt" 9 7 "io" ··· 18 16 "sync" 19 17 "time" 20 18 21 - "github.com/bluesky-social/indigo/api/atproto" 22 - comatproto "github.com/bluesky-social/indigo/api/atproto" 23 19 "github.com/bluesky-social/indigo/events" 24 20 "github.com/bluesky-social/indigo/events/pebblepersist" 25 21 "github.com/bluesky-social/indigo/events/schedulers/sequential" 26 22 "github.com/bluesky-social/indigo/util" 27 23 "github.com/bluesky-social/indigo/util/svcutil" 28 - "github.com/bluesky-social/indigo/xrpc" 29 24 30 25 "github.com/gorilla/websocket" 31 26 "github.com/labstack/echo/v4" 32 27 "github.com/labstack/echo/v4/middleware" 33 - "github.com/prometheus/client_golang/prometheus" 34 28 "github.com/prometheus/client_golang/prometheus/promhttp" 35 - dto "github.com/prometheus/client_model/go" 36 - "go.opentelemetry.io/otel" 37 29 ) 38 30 39 31 type Splitter struct { ··· 48 40 49 41 conf SplitterConfig 50 42 51 - log *slog.Logger 43 + logger *slog.Logger 52 44 53 - httpC *http.Client 54 - nextCrawlers []*url.URL 45 + upstreamClient *http.Client 46 + nextCrawlers []*url.URL 55 47 } 56 48 57 49 type SplitterConfig struct { 58 50 UpstreamHost string 59 51 CursorFile string 52 + UserAgent string 60 53 PebbleOptions *pebblepersist.PebblePersistOptions 54 + Logger *slog.Logger 61 55 } 62 56 63 57 func (sc *SplitterConfig) XrpcRootUrl() string { ··· 77 71 } 78 72 79 73 func (sc *SplitterConfig) UpstreamUrl() string { 80 - if strings.HasPrefix(sc.UpstreamHost, "ws://") { 81 - return "http://" + sc.UpstreamHost[7:] 74 + if strings.HasPrefix(sc.UpstreamHost, "http://") { 75 + return "ws://" + sc.UpstreamHost[7:] 82 76 } 83 - if strings.HasPrefix(sc.UpstreamHost, "wss://") { 84 - return "https://" + sc.UpstreamHost[8:] 77 + if strings.HasPrefix(sc.UpstreamHost, "https://") { 78 + return "wss://" + sc.UpstreamHost[8:] 85 79 } 86 80 if strings.HasPrefix(sc.UpstreamHost, "ws://") { 87 81 return sc.UpstreamHost ··· 93 87 } 94 88 95 89 func NewSplitter(conf SplitterConfig, nextCrawlers []string) (*Splitter, error) { 90 + 91 + logger := conf.Logger 92 + if logger == nil { 93 + logger = slog.Default().With("system", "splitter") 94 + } 95 + 96 96 var nextCrawlerURLs []*url.URL 97 - log := slog.Default().With("system", "splitter") 98 97 if len(nextCrawlers) > 0 { 99 98 nextCrawlerURLs = make([]*url.URL, len(nextCrawlers)) 100 99 for i, tu := range nextCrawlers { ··· 103 102 if err != nil { 104 103 return nil, fmt.Errorf("failed to parse next-crawler url: %w", err) 105 104 } 106 - log.Info("configuring relay for requestCrawl", "host", nextCrawlerURLs[i]) 105 + logger.Info("configuring relay for requestCrawl", "host", nextCrawlerURLs[i]) 107 106 } 108 107 } 109 108 ··· 113 112 } 114 113 115 114 s := &Splitter{ 116 - conf: conf, 117 - consumers: make(map[uint64]*SocketConsumer), 118 - log: log, 119 - httpC: util.RobustHTTPClient(), 120 - nextCrawlers: nextCrawlerURLs, 115 + conf: conf, 116 + consumers: make(map[uint64]*SocketConsumer), 117 + logger: logger, 118 + upstreamClient: util.RobustHTTPClient(), 119 + nextCrawlers: nextCrawlerURLs, 121 120 } 122 121 123 122 if conf.PebbleOptions == nil { ··· 137 136 138 137 return s, nil 139 138 } 140 - func NewDiskSplitter(host, path string, persistHours float64, maxBytes int64) (*Splitter, error) { 141 - ppopts := pebblepersist.PebblePersistOptions{ 142 - DbPath: path, 143 - PersistDuration: time.Duration(float64(time.Hour) * persistHours), 144 - GCPeriod: 5 * time.Minute, 145 - MaxBytes: uint64(maxBytes), 146 - } 147 - conf := SplitterConfig{ 148 - UpstreamHost: host, 149 - CursorFile: "cursor-file", 150 - PebbleOptions: &ppopts, 151 - } 152 - pp, err := pebblepersist.NewPebblePersistance(&ppopts) 153 - if err != nil { 154 - return nil, err 155 - } 156 139 157 - go pp.GCThread(context.Background()) 158 - em := events.NewEventManager(pp) 159 - return &Splitter{ 160 - conf: conf, 161 - pp: pp, 162 - events: em, 163 - consumers: make(map[uint64]*SocketConsumer), 164 - log: slog.Default().With("system", "splitter"), 165 - }, nil 166 - } 167 - 168 - func (s *Splitter) Start(addr string) error { 140 + func (s *Splitter) StartAPI(addr string) error { 169 141 var lc net.ListenConfig 170 142 ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) 171 143 defer cancel() ··· 181 153 if err != nil { 182 154 return err 183 155 } 184 - return s.StartWithListener(li) 156 + return s.startWithListener(li) 185 157 } 186 158 187 159 func (s *Splitter) StartMetrics(listen string) error { ··· 193 165 return nil 194 166 } 195 167 196 - func (s *Splitter) StartWithListener(listen net.Listener) error { 168 + func (s *Splitter) startWithListener(listen net.Listener) error { 197 169 e := echo.New() 198 170 e.HideBanner = true 199 171 ··· 213 185 */ 214 186 215 187 e.Use(svcutil.MetricsMiddleware) 188 + e.HTTPErrorHandler = s.errorHandler 216 189 217 - e.HTTPErrorHandler = func(err error, ctx echo.Context) { 218 - switch err := err.(type) { 219 - case *echo.HTTPError: 220 - if err2 := ctx.JSON(err.Code, map[string]any{ 221 - "error": err.Message, 222 - }); err2 != nil { 223 - s.log.Error("Failed to write http error", "err", err2) 224 - } 225 - default: 226 - sendHeader := true 227 - if ctx.Path() == "/xrpc/com.atproto.sync.subscribeRepos" { 228 - sendHeader = false 229 - } 190 + e.POST("/xrpc/com.atproto.sync.requestCrawl", s.HandleComAtprotoSyncRequestCrawl) 191 + e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.HandleSubscribeRepos) 230 192 231 - s.log.Warn("HANDLER ERROR", "path", ctx.Path(), "err", err) 232 - 233 - if strings.HasPrefix(ctx.Path(), "/admin/") { 234 - ctx.JSON(500, map[string]any{ 235 - "error": err.Error(), 236 - }) 237 - return 238 - } 239 - 240 - if sendHeader { 241 - ctx.Response().WriteHeader(500) 242 - } 243 - } 244 - } 245 - 246 - // TODO: this API is temporary until we formalize what we want here 247 - 248 - e.POST("/xrpc/com.atproto.sync.requestCrawl", s.RequestCrawlHandler) 249 - e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.EventsHandler) 250 - e.GET("/xrpc/com.atproto.sync.listRepos", s.HandleComAtprotoSyncListRepos) 193 + // proxy endpoints to upstream (relay) 194 + e.GET("/xrpc/com.atproto.sync.listRepos", s.ProxyRequestUpstream) 195 + e.GET("/xrpc/com.atproto.sync.getRepoStatus", s.ProxyRequestUpstream) 196 + e.GET("/xrpc/com.atproto.sync.getLatestCommit", s.ProxyRequestUpstream) 197 + e.GET("/xrpc/com.atproto.sync.listHosts", s.ProxyRequestUpstream) 198 + e.GET("/xrpc/com.atproto.sync.getHostStatus", s.ProxyRequestUpstream) 199 + e.GET("/xrpc/com.atproto.sync.getRepo", s.ProxyRequestUpstream) 200 + // TODO: proxy listReposByCollection to a different host? 251 201 252 202 e.GET("/xrpc/_health", s.HandleHealthCheck) 253 203 e.GET("/_health", s.HandleHealthCheck) ··· 261 211 return e.StartServer(srv) 262 212 } 263 213 264 - type HealthStatus struct { 265 - Status string `json:"status"` 266 - Message string `json:"msg,omitempty"` 267 - } 268 - 269 - func (s *Splitter) HandleHealthCheck(c echo.Context) error { 270 - return c.JSON(200, HealthStatus{Status: "ok"}) 271 - } 272 - 273 - var homeMessage string = ` 274 - _ _ 275 - _ _ __ _(_)_ _ | |__ _____ __ __ 276 - | '_/ _' | | ' \| '_ \/ _ \ V V / 277 - |_| \__,_|_|_||_|_.__/\___/\_/\_/ 278 - 279 - This is an atproto [https://atproto.com] firehose fanout service, running the 'rainbow' codebase [https://github.com/bluesky-social/indigo] 280 - 281 - The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos 282 - ` 283 - 284 - func (s *Splitter) HandleHomeMessage(c echo.Context) error { 285 - return c.String(http.StatusOK, homeMessage) 286 - } 287 - 288 - type XRPCError struct { 289 - Message string `json:"message"` 290 - } 291 - 292 - func (s *Splitter) RequestCrawlHandler(c echo.Context) error { 293 - ctx := c.Request().Context() 294 - var body comatproto.SyncRequestCrawl_Input 295 - if err := c.Bind(&body); err != nil { 296 - return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid body: %s", err)}) 297 - } 298 - 299 - host := body.Hostname 300 - if host == "" { 301 - return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname") 302 - } 303 - 304 - if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") { 305 - host = "https://" + host 306 - } 307 - 308 - u, err := url.Parse(host) 309 - if err != nil { 310 - return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname") 311 - } 312 - 313 - if u.Scheme == "http" { 314 - return echo.NewHTTPError(http.StatusBadRequest, "this server requires https") 315 - } 316 - if u.Path != "" { 317 - return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path") 318 - } 319 - 320 - if u.Query().Encode() != "" { 321 - return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without query") 322 - } 323 - 324 - host = u.Host // potentially hostname:port 325 - 326 - clientHost := fmt.Sprintf("%s://%s", u.Scheme, host) 327 - 328 - xrpcC := &xrpc.Client{ 329 - Host: clientHost, 330 - Client: http.DefaultClient, // not using the client that auto-retries 331 - } 332 - 333 - desc, err := atproto.ServerDescribeServer(ctx, xrpcC) 334 - if err != nil { 335 - errMsg := fmt.Sprintf("requested host (%s) failed to respond to describe request", clientHost) 336 - return echo.NewHTTPError(http.StatusBadRequest, errMsg) 337 - } 338 - 339 - // Maybe we could do something with this response later 340 - _ = desc 341 - 342 - if len(s.nextCrawlers) != 0 { 343 - blob, err := json.Marshal(body) 344 - if err != nil { 345 - s.log.Warn("could not forward requestCrawl, json err", "err", err) 346 - } else { 347 - go func(bodyBlob []byte) { 348 - for _, remote := range s.nextCrawlers { 349 - if remote == nil { 350 - continue 351 - } 352 - 353 - pu := remote.JoinPath("/xrpc/com.atproto.sync.requestCrawl") 354 - response, err := s.httpC.Post(pu.String(), "application/json", bytes.NewReader(bodyBlob)) 355 - if response != nil && response.Body != nil { 356 - response.Body.Close() 357 - } 358 - if err != nil || response == nil { 359 - s.log.Warn("requestCrawl forward failed", "host", remote, "err", err) 360 - } else if response.StatusCode != http.StatusOK { 361 - s.log.Warn("requestCrawl forward failed", "host", remote, "status", response.Status) 362 - } else { 363 - s.log.Info("requestCrawl forward successful", "host", remote) 364 - } 365 - } 366 - }(blob) 214 + func (s *Splitter) errorHandler(err error, ctx echo.Context) { 215 + switch err := err.(type) { 216 + case *echo.HTTPError: 217 + if err2 := ctx.JSON(err.Code, map[string]any{ 218 + "error": err.Message, 219 + }); err2 != nil { 220 + s.logger.Error("Failed to write http error", "err", err2) 221 + } 222 + default: 223 + sendHeader := true 224 + if ctx.Path() == "/xrpc/com.atproto.sync.subscribeRepos" { 225 + sendHeader = false 367 226 } 368 - } 369 - 370 - return c.JSON(200, HealthStatus{Status: "ok"}) 371 - } 372 - 373 - func (s *Splitter) HandleComAtprotoSyncListRepos(c echo.Context) error { 374 - ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncListRepos") 375 - defer span.End() 376 227 377 - cursorQuery := c.QueryParam("cursor") 378 - limitQuery := c.QueryParam("limit") 379 - 380 - var err error 228 + s.logger.Warn("HANDLER ERROR", "path", ctx.Path(), "err", err) 381 229 382 - limit := int64(500) 383 - if limitQuery != "" { 384 - limit, err = strconv.ParseInt(limitQuery, 10, 64) 385 - if err != nil || limit < 1 || limit > 1000 { 386 - return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid limit: %s", limitQuery)}) 230 + if strings.HasPrefix(ctx.Path(), "/admin/") { 231 + ctx.JSON(500, map[string]any{ 232 + "error": err.Error(), 233 + }) 234 + return 387 235 } 388 - } 389 236 390 - client := xrpc.Client{ 391 - Client: s.httpC, 392 - Host: s.conf.XrpcRootUrl(), 237 + if sendHeader { 238 + ctx.Response().WriteHeader(500) 239 + } 393 240 } 394 - 395 - out, handleErr := atproto.SyncListRepos(ctx, &client, cursorQuery, limit) 396 - if handleErr != nil { 397 - return handleErr 398 - } 399 - return c.JSON(200, out) 400 241 } 401 242 402 - func (s *Splitter) EventsHandler(c echo.Context) error { 403 - var since *int64 404 - if sinceVal := c.QueryParam("cursor"); sinceVal != "" { 405 - sval, err := strconv.ParseInt(sinceVal, 10, 64) 406 - if err != nil { 407 - return err 243 + func (s *Splitter) getLastCursor() (int64, error) { 244 + if s.pp != nil { 245 + seq, millis, _, err := s.pp.GetLast(context.Background()) 246 + if err == nil { 247 + s.logger.Debug("got last cursor from pebble", "seq", seq, "millis", millis) 248 + return seq, nil 249 + } else if errors.Is(err, pebblepersist.ErrNoLast) { 250 + s.logger.Info("pebble no last") 251 + } else { 252 + s.logger.Error("pebble seq fail", "err", err) 408 253 } 409 - since = &sval 410 254 } 411 255 412 - ctx, cancel := context.WithCancel(c.Request().Context()) 413 - defer cancel() 414 - 415 - // TODO: authhhh 416 - conn, err := websocket.Upgrade(c.Response(), c.Request(), c.Response().Header(), 10<<10, 10<<10) 256 + fi, err := os.Open(s.conf.CursorFile) 417 257 if err != nil { 418 - return fmt.Errorf("upgrading websocket: %w", err) 419 - } 420 - 421 - defer conn.Close() 422 - 423 - lastWriteLk := sync.Mutex{} 424 - lastWrite := time.Now() 425 - 426 - // Start a goroutine to ping the client every 30 seconds to check if it's 427 - // still alive. If the client doesn't respond to a ping within 5 seconds, 428 - // we'll close the connection and teardown the consumer. 429 - go func() { 430 - ticker := time.NewTicker(30 * time.Second) 431 - defer ticker.Stop() 432 - 433 - for { 434 - select { 435 - case <-ticker.C: 436 - lastWriteLk.Lock() 437 - lw := lastWrite 438 - lastWriteLk.Unlock() 439 - 440 - if time.Since(lw) < 30*time.Second { 441 - continue 442 - } 443 - 444 - if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)); err != nil { 445 - s.log.Error("failed to ping client", "err", err) 446 - cancel() 447 - return 448 - } 449 - case <-ctx.Done(): 450 - return 451 - } 258 + if os.IsNotExist(err) { 259 + return -1, nil 452 260 } 453 - }() 261 + return -1, err 262 + } 454 263 455 - conn.SetPingHandler(func(message string) error { 456 - err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second*60)) 457 - if err == websocket.ErrCloseSent { 458 - return nil 459 - } else if e, ok := err.(net.Error); ok && e.Temporary() { 460 - return nil 461 - } 462 - return err 463 - }) 464 - 465 - // Start a goroutine to read messages from the client and discard them. 466 - go func() { 467 - for { 468 - _, _, err := conn.ReadMessage() 469 - if err != nil { 470 - s.log.Error("failed to read message from client", "err", err) 471 - cancel() 472 - return 473 - } 474 - } 475 - }() 476 - 477 - ident := c.RealIP() + "-" + c.Request().UserAgent() 478 - 479 - evts, cleanup, err := s.events.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { return true }, since) 264 + b, err := io.ReadAll(fi) 480 265 if err != nil { 481 - return err 266 + return -1, err 482 267 } 483 - defer cleanup() 484 268 485 - // Keep track of the consumer for metrics and admin endpoints 486 - consumer := SocketConsumer{ 487 - RemoteAddr: c.RealIP(), 488 - UserAgent: c.Request().UserAgent(), 489 - ConnectedAt: time.Now(), 269 + v, err := strconv.ParseInt(string(b), 10, 64) 270 + if err != nil { 271 + return -1, err 490 272 } 491 - sentCounter := eventsSentCounter.WithLabelValues(consumer.RemoteAddr, consumer.UserAgent) 492 - consumer.EventsSent = sentCounter 493 273 494 - consumerID := s.registerConsumer(&consumer) 495 - defer s.cleanupConsumer(consumerID) 496 - 497 - s.log.Info("new consumer", 498 - "remote_addr", consumer.RemoteAddr, 499 - "user_agent", consumer.UserAgent, 500 - "cursor", since, 501 - "consumer_id", consumerID, 502 - ) 503 - activeClientGauge.Inc() 504 - defer activeClientGauge.Dec() 505 - 506 - for { 507 - select { 508 - case evt, ok := <-evts: 509 - if !ok { 510 - s.log.Error("event stream closed unexpectedly") 511 - return nil 512 - } 513 - 514 - wc, err := conn.NextWriter(websocket.BinaryMessage) 515 - if err != nil { 516 - s.log.Error("failed to get next writer", "err", err) 517 - return err 518 - } 519 - 520 - if evt.Preserialized != nil { 521 - _, err = wc.Write(evt.Preserialized) 522 - } else { 523 - err = evt.Serialize(wc) 524 - } 525 - if err != nil { 526 - return fmt.Errorf("failed to write event: %w", err) 527 - } 528 - 529 - if err := wc.Close(); err != nil { 530 - s.log.Warn("failed to flush-close our event write", "err", err) 531 - return nil 532 - } 533 - 534 - lastWriteLk.Lock() 535 - lastWrite = time.Now() 536 - lastWriteLk.Unlock() 537 - sentCounter.Inc() 538 - case <-ctx.Done(): 539 - return nil 540 - } 541 - } 274 + return v, nil 542 275 } 543 276 544 - type SocketConsumer struct { 545 - UserAgent string 546 - RemoteAddr string 547 - ConnectedAt time.Time 548 - EventsSent prometheus.Counter 549 - } 550 - 551 - func (s *Splitter) registerConsumer(c *SocketConsumer) uint64 { 552 - s.consumersLk.Lock() 553 - defer s.consumersLk.Unlock() 554 - 555 - id := s.nextConsumerID 556 - s.nextConsumerID++ 557 - 558 - s.consumers[id] = c 559 - 560 - return id 561 - } 562 - 563 - func (s *Splitter) cleanupConsumer(id uint64) { 564 - s.consumersLk.Lock() 565 - defer s.consumersLk.Unlock() 566 - 567 - c := s.consumers[id] 568 - 569 - var m = &dto.Metric{} 570 - if err := c.EventsSent.Write(m); err != nil { 571 - s.log.Error("failed to get sent counter", "err", err) 572 - } 573 - 574 - s.log.Info("consumer disconnected", 575 - "consumer_id", id, 576 - "remote_addr", c.RemoteAddr, 577 - "user_agent", c.UserAgent, 578 - "events_sent", m.Counter.GetValue()) 579 - 580 - delete(s.consumers, id) 277 + func (s *Splitter) writeCursor(curs int64) error { 278 + return os.WriteFile(s.conf.CursorFile, []byte(fmt.Sprint(curs)), 0664) 581 279 } 582 280 583 281 func sleepForBackoff(b int) time.Duration { ··· 601 299 } 602 300 upstreamUrl = upstreamUrl.JoinPath("/xrpc/com.atproto.sync.subscribeRepos") 603 301 302 + header := http.Header{ 303 + "User-Agent": []string{s.conf.UserAgent}, 304 + } 305 + 604 306 var backoff int 605 307 for { 606 308 select { 607 309 case <-ctx.Done(): 608 310 return 609 311 default: 610 - } 611 - 612 - header := http.Header{ 613 - "User-Agent": []string{"bgs-rainbow-v0"}, 614 312 } 615 313 616 314 var uurl string ··· 623 321 } 624 322 con, res, err := d.DialContext(ctx, uurl, header) 625 323 if err != nil { 626 - s.log.Warn("dialing failed", "url", uurl, "err", err, "backoff", backoff) 324 + s.logger.Warn("dialing failed", "url", uurl, "err", err, "backoff", backoff) 627 325 time.Sleep(sleepForBackoff(backoff)) 628 326 backoff++ 629 327 630 328 continue 631 329 } 632 330 633 - s.log.Info("event subscription response", "code", res.StatusCode) 331 + s.logger.Info("event subscription response", "code", res.StatusCode) 634 332 635 - if err := s.handleConnection(ctx, con, &cursor); err != nil { 636 - s.log.Warn("connection failed", "url", uurl, "err", err) 333 + if err := s.handleUpstreamConnection(ctx, con, &cursor); err != nil { 334 + s.logger.Warn("upstream connection failed", "url", uurl, "err", err) 637 335 } 638 336 } 639 337 } 640 338 641 - func (s *Splitter) handleConnection(ctx context.Context, con *websocket.Conn, lastCursor *int64) error { 339 + func (s *Splitter) handleUpstreamConnection(ctx context.Context, con *websocket.Conn, lastCursor *int64) error { 642 340 ctx, cancel := context.WithCancel(ctx) 643 341 defer cancel() 644 342 ··· 656 354 if seq%5000 == 0 { 657 355 // TODO: don't need this after we move to getting seq from pebble 658 356 if err := s.writeCursor(seq); err != nil { 659 - s.log.Error("write cursor failed", "err", err) 357 + s.logger.Error("write cursor failed", "err", err) 660 358 } 661 359 } 662 360 ··· 666 364 667 365 return events.HandleRepoStream(ctx, con, sched, nil) 668 366 } 669 - 670 - func (s *Splitter) getLastCursor() (int64, error) { 671 - if s.pp != nil { 672 - seq, millis, _, err := s.pp.GetLast(context.Background()) 673 - if err == nil { 674 - s.log.Debug("got last cursor from pebble", "seq", seq, "millis", millis) 675 - return seq, nil 676 - } else if errors.Is(err, pebblepersist.ErrNoLast) { 677 - s.log.Info("pebble no last") 678 - } else { 679 - s.log.Error("pebble seq fail", "err", err) 680 - } 681 - } 682 - 683 - fi, err := os.Open(s.conf.CursorFile) 684 - if err != nil { 685 - if os.IsNotExist(err) { 686 - return -1, nil 687 - } 688 - return -1, err 689 - } 690 - 691 - b, err := io.ReadAll(fi) 692 - if err != nil { 693 - return -1, err 694 - } 695 - 696 - v, err := strconv.ParseInt(string(b), 10, 64) 697 - if err != nil { 698 - return -1, err 699 - } 700 - 701 - return v, nil 702 - } 703 - 704 - func (s *Splitter) writeCursor(curs int64) error { 705 - return os.WriteFile(s.conf.CursorFile, []byte(fmt.Sprint(curs)), 0664) 706 - }