this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

remove echo dep from relay pkg and test runner

+39 -33
+8 -17
cmd/relayered/relay/subscribe_repos.go
··· 4 4 "context" 5 5 "fmt" 6 6 "net" 7 - "strconv" 7 + "net/http" 8 8 "sync" 9 9 "time" 10 10 11 11 "github.com/bluesky-social/indigo/cmd/relayered/stream" 12 12 13 13 "github.com/gorilla/websocket" 14 - "github.com/labstack/echo/v4" 15 14 promclient "github.com/prometheus/client_golang/prometheus" 16 15 dto "github.com/prometheus/client_model/go" 17 16 ) ··· 55 54 delete(r.consumers, id) 56 55 } 57 56 58 - // GET+websocket /xrpc/com.atproto.sync.subscribeRepos 59 - func (r *Relay) EventsHandler(c echo.Context) error { 60 - var since *int64 61 - if sinceVal := c.QueryParam("cursor"); sinceVal != "" { 62 - sval, err := strconv.ParseInt(sinceVal, 10, 64) 63 - if err != nil { 64 - return err 65 - } 66 - since = &sval 67 - } 57 + // Main HTTP request handler for clients connecting to the firehose (com.atproto.sync.subscribeRepos) 58 + func (r *Relay) HandleSubscribeRepos(resp http.ResponseWriter, req *http.Request, since *int64, realIP string) error { 68 59 69 - ctx, cancel := context.WithCancel(c.Request().Context()) 60 + ctx, cancel := context.WithCancel(req.Context()) 70 61 defer cancel() 71 62 72 - conn, err := websocket.Upgrade(c.Response(), c.Request(), c.Response().Header(), 10<<10, 10<<10) 63 + conn, err := websocket.Upgrade(resp, req, resp.Header(), 10<<10, 10<<10) 73 64 if err != nil { 74 65 return fmt.Errorf("upgrading websocket: %w", err) 75 66 } ··· 130 121 } 131 122 }() 132 123 133 - ident := c.RealIP() + "-" + c.Request().UserAgent() 124 + ident := realIP + "-" + req.UserAgent() 134 125 135 126 evts, cleanup, err := r.Events.Subscribe(ctx, ident, func(evt *stream.XRPCStreamEvent) bool { return true }, since) 136 127 if err != nil { ··· 140 131 141 132 // Keep track of the consumer for metrics and admin endpoints 142 133 consumer := SocketConsumer{ 143 - RemoteAddr: c.RealIP(), 144 - UserAgent: c.Request().UserAgent(), 134 + RemoteAddr: realIP, 135 + UserAgent: req.UserAgent(), 145 136 ConnectedAt: time.Now(), 146 137 } 147 138 sentCounter := eventsSentCounter.WithLabelValues(consumer.RemoteAddr, consumer.UserAgent)
+1 -1
cmd/relayered/service.go
··· 138 138 e.GET("/_health", svc.HandleHealthCheck) 139 139 e.GET("/xrpc/_health", svc.HandleHealthCheck) 140 140 141 - e.GET("/xrpc/com.atproto.sync.subscribeRepos", svc.relay.EventsHandler) 141 + e.GET("/xrpc/com.atproto.sync.subscribeRepos", svc.HandleComAtprotoSyncSubscribeRepos) 142 142 143 143 e.POST("/xrpc/com.atproto.sync.requestCrawl", svc.HandleComAtprotoSyncRequestCrawl) 144 144 e.GET("/xrpc/com.atproto.sync.listRepos", svc.HandleComAtprotoSyncListRepos)
+12
cmd/relayered/stubs.go
··· 28 28 return nil 29 29 } 30 30 31 + func (s *Service) HandleComAtprotoSyncSubscribeRepos(c echo.Context) error { 32 + var since *int64 33 + if sinceVal := c.QueryParam("cursor"); sinceVal != "" { 34 + sval, err := strconv.ParseInt(sinceVal, 10, 64) 35 + if err != nil { 36 + return err 37 + } 38 + since = &sval 39 + } 40 + return s.relay.HandleSubscribeRepos(c.Response(), c.Request(), since, c.RealIP()) 41 + } 42 + 31 43 func (s *Service) HandleComAtprotoSyncGetLatestCommit(c echo.Context) error { 32 44 ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetLatestCommit") 33 45 defer span.End()
+1 -1
cmd/relayered/testing/producer.go
··· 32 32 BufferSize: 1024, 33 33 mux: mux, 34 34 } 35 - mux.HandleFunc("/xrpc/com.atproto.sync.subscribeRepos", p.handleSubscribeRepos) 35 + mux.HandleFunc("GET /xrpc/com.atproto.sync.subscribeRepos", p.handleSubscribeRepos) 36 36 return &p 37 37 } 38 38
+17 -14
cmd/relayered/testing/runner.go
··· 16 16 "github.com/bluesky-social/indigo/cmd/relayered/stream/eventmgr" 17 17 "github.com/bluesky-social/indigo/cmd/relayered/stream/persist/diskpersist" 18 18 "github.com/bluesky-social/indigo/util/cliutil" 19 - 20 - "github.com/labstack/echo/v4" 21 19 ) 22 20 23 21 type SimpleRelay struct { 24 22 Relay *relay.Relay 25 23 Port int 26 - echo *echo.Echo 24 + } 25 + 26 + func (sr *SimpleRelay) handleSubscribeRepos(w http.ResponseWriter, r *http.Request) { 27 + err := sr.Relay.HandleSubscribeRepos(w, r, nil, "0.0.0.0") 28 + if err != nil { 29 + slog.Error("subscribeRepos", "err", err) 30 + } 27 31 } 28 32 29 33 func MustSimpleRelay(dir identity.Directory, tmpd string) *SimpleRelay { ··· 56 60 panic(err) 57 61 } 58 62 port := listener.Addr().(*net.TCPAddr).Port 59 - slog.Info("starting test relay", "port", port) 63 + 64 + sr := SimpleRelay{ 65 + Relay: r, 66 + Port: port, 67 + } 60 68 61 - e := echo.New() 62 - e.HideBanner = true 63 - e.GET("/xrpc/com.atproto.sync.subscribeRepos", r.EventsHandler) 64 - e.Listener = listener 65 - srv := &http.Server{} 69 + mux := http.NewServeMux() 70 + mux.HandleFunc("GET /xrpc/com.atproto.sync.subscribeRepos", sr.handleSubscribeRepos) 66 71 72 + slog.Info("starting test relay", "port", port) 67 73 go func() { 68 74 defer listener.Close() 69 - err := e.StartServer(srv) 75 + err := http.Serve(listener, mux) 70 76 if err != nil { 71 77 slog.Warn("test relay shutting down", "err", err) 72 78 } 73 79 }() 74 - return &SimpleRelay{ 75 - Relay: r, 76 - Port: port, 77 - } 80 + return &sr 78 81 } 79 82 80 83 func LoadAndRunScenario(ctx context.Context, fpath string) error {