this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

splitter is now functional

+309 -152
-147
cmd/splitter/main.go
··· 1 - package main 2 - 3 - import ( 4 - "context" 5 - "os" 6 - "os/signal" 7 - "syscall" 8 - "time" 9 - 10 - "github.com/bluesky-social/indigo/bgs" 11 - "github.com/bluesky-social/indigo/util/version" 12 - _ "go.uber.org/automaxprocs" 13 - 14 - _ "net/http/pprof" 15 - 16 - _ "github.com/joho/godotenv/autoload" 17 - 18 - logging "github.com/ipfs/go-log" 19 - "github.com/urfave/cli/v2" 20 - "go.opentelemetry.io/otel" 21 - "go.opentelemetry.io/otel/attribute" 22 - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" 23 - "go.opentelemetry.io/otel/sdk/resource" 24 - tracesdk "go.opentelemetry.io/otel/sdk/trace" 25 - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" 26 - ) 27 - 28 - var log = logging.Logger("splitter") 29 - 30 - func init() { 31 - // control log level using, eg, GOLOG_LOG_LEVEL=debug 32 - //logging.SetAllLoggers(logging.LevelDebug) 33 - } 34 - 35 - func main() { 36 - run(os.Args) 37 - } 38 - 39 - func run(args []string) { 40 - app := cli.App{ 41 - Name: "splitter", 42 - Usage: "firehose proxy", 43 - Version: version.Version, 44 - } 45 - 46 - app.Flags = []cli.Flag{ 47 - &cli.BoolFlag{ 48 - Name: "crawl-insecure-ws", 49 - Usage: "when connecting to PDS instances, use ws:// instead of wss://", 50 - }, 51 - &cli.StringFlag{ 52 - Name: "api-listen", 53 - Value: ":2480", 54 - }, 55 - &cli.StringFlag{ 56 - Name: "metrics-listen", 57 - Value: ":2481", 58 - EnvVars: []string{"SPLITTER_METRICS_LISTEN"}, 59 - }, 60 - } 61 - 62 - app.Action = Splitter 63 - err := app.Run(os.Args) 64 - if err != nil { 65 - log.Fatal(err) 66 - } 67 - } 68 - 69 - func Splitter(cctx *cli.Context) error { 70 - // Trap SIGINT to trigger a shutdown. 71 - signals := make(chan os.Signal, 1) 72 - signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 73 - 74 - // Enable OTLP HTTP exporter 75 - // For relevant environment variables: 76 - // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables 77 - // At a minimum, you need to set 78 - // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 79 - if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" { 80 - log.Infow("setting up trace exporter", "endpoint", ep) 81 - ctx, cancel := context.WithCancel(context.Background()) 82 - defer cancel() 83 - 84 - exp, err := otlptracehttp.New(ctx) 85 - if err != nil { 86 - log.Fatalw("failed to create trace exporter", "error", err) 87 - } 88 - defer func() { 89 - ctx, cancel := context.WithTimeout(context.Background(), time.Second) 90 - defer cancel() 91 - if err := exp.Shutdown(ctx); err != nil { 92 - log.Errorw("failed to shutdown trace exporter", "error", err) 93 - } 94 - }() 95 - 96 - tp := tracesdk.NewTracerProvider( 97 - tracesdk.WithBatcher(exp), 98 - tracesdk.WithResource(resource.NewWithAttributes( 99 - semconv.SchemaURL, 100 - semconv.ServiceNameKey.String("splitter"), 101 - attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog 102 - attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others 103 - attribute.Int64("ID", 1), 104 - )), 105 - ) 106 - otel.SetTracerProvider(tp) 107 - } 108 - 109 - spl := splitter.New(cctx.String("bgs-host")) 110 - 111 - // set up metrics endpoint 112 - go func() { 113 - if err := spl.StartMetrics(cctx.String("metrics-listen")); err != nil { 114 - log.Fatalf("failed to start metrics endpoint: %s", err) 115 - } 116 - }() 117 - 118 - runErr := make(chan error, 1) 119 - 120 - go func() { 121 - err := spl.Start(cctx.String("api-listen")) 122 - runErr <- err 123 - }() 124 - 125 - log.Infow("startup complete") 126 - select { 127 - case <-signals: 128 - log.Info("received shutdown signal") 129 - errs := spl.Shutdown() 130 - for err := range errs { 131 - log.Errorw("error during Splitter shutdown", "err", err) 132 - } 133 - case err := <-runErr: 134 - if err != nil { 135 - log.Errorw("error during Splitter startup", "err", err) 136 - } 137 - log.Info("shutting down") 138 - errs := bgs.Shutdown() 139 - for err := range errs { 140 - log.Errorw("error during Splitter shutdown", "err", err) 141 - } 142 - } 143 - 144 - log.Info("shutdown complete") 145 - 146 - return nil 147 - }
+309 -5
splitter/splitter.go
··· 4 4 "context" 5 5 "fmt" 6 6 "math/rand" 7 + "net" 8 + "net/http" 9 + "strconv" 10 + "strings" 7 11 "sync" 8 12 "time" 9 13 14 + "github.com/bluesky-social/indigo/bgs" 10 15 events "github.com/bluesky-social/indigo/events" 11 16 "github.com/bluesky-social/indigo/events/schedulers/sequential" 17 + lexutil "github.com/bluesky-social/indigo/lex/util" 12 18 "github.com/bluesky-social/indigo/models" 13 19 "github.com/gorilla/websocket" 14 20 logging "github.com/ipfs/go-log" 21 + "github.com/labstack/echo/v4" 22 + "github.com/labstack/echo/v4/middleware" 23 + promclient "github.com/prometheus/client_golang/prometheus" 24 + "github.com/prometheus/client_golang/prometheus/promhttp" 25 + dto "github.com/prometheus/client_model/go" 15 26 ) 16 27 17 28 var log = logging.Logger("splitter") ··· 20 31 Host string 21 32 erb *EventRingBuffer 22 33 events *events.EventManager 34 + 35 + // Management of Socket Consumers 36 + consumersLk sync.RWMutex 37 + nextConsumerID uint64 38 + consumers map[uint64]*SocketConsumer 23 39 } 24 40 25 - func NewSplitter(host string, persister events.EventPersistence) *Splitter { 41 + func NewSplitter(host string) *Splitter { 26 42 erb := NewEventRingBuffer(20000, 1000) 27 43 28 44 em := events.NewEventManager(erb) 29 45 return &Splitter{ 30 - Host: host, 31 - erb: erb, 32 - events: em, 46 + Host: host, 47 + erb: erb, 48 + events: em, 49 + consumers: make(map[uint64]*SocketConsumer), 33 50 } 34 51 } 35 52 36 - func (s *Splitter) Start() error { 53 + func (s *Splitter) Start(addr string) error { 54 + var lc net.ListenConfig 55 + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) 56 + defer cancel() 57 + 58 + go s.subscribeWithRedialer(context.Background(), s.Host, 0) 59 + 60 + li, err := lc.Listen(ctx, "tcp", addr) 61 + if err != nil { 62 + return err 63 + } 64 + return s.StartWithListener(li) 65 + } 66 + 67 + func (s *Splitter) StartMetrics(listen string) error { 68 + http.Handle("/metrics", promhttp.Handler()) 69 + return http.ListenAndServe(listen, nil) 70 + } 71 + 72 + func (s *Splitter) Shutdown() error { 37 73 return nil 38 74 } 39 75 76 + func (s *Splitter) StartWithListener(listen net.Listener) error { 77 + e := echo.New() 78 + e.HideBanner = true 79 + 80 + e.Use(middleware.CORSWithConfig(middleware.CORSConfig{ 81 + AllowOrigins: []string{"http://localhost:*", "https://bgs.bsky-sandbox.dev"}, 82 + AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept, echo.HeaderAuthorization}, 83 + })) 84 + 85 + /* 86 + if !s.ssl { 87 + e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ 88 + Format: "method=${method}, uri=${uri}, status=${status} latency=${latency_human}\n", 89 + })) 90 + } else { 91 + e.Use(middleware.LoggerWithConfig(middleware.DefaultLoggerConfig)) 92 + } 93 + */ 94 + 95 + e.Use(bgs.MetricsMiddleware) 96 + 97 + e.HTTPErrorHandler = func(err error, ctx echo.Context) { 98 + switch err := err.(type) { 99 + case *echo.HTTPError: 100 + if err2 := ctx.JSON(err.Code, map[string]any{ 101 + "error": err.Message, 102 + }); err2 != nil { 103 + log.Errorf("Failed to write http error: %s", err2) 104 + } 105 + default: 106 + sendHeader := true 107 + if ctx.Path() == "/xrpc/com.atproto.sync.subscribeRepos" { 108 + sendHeader = false 109 + } 110 + 111 + log.Warnf("HANDLER ERROR: (%s) %s", ctx.Path(), err) 112 + 113 + if strings.HasPrefix(ctx.Path(), "/admin/") { 114 + ctx.JSON(500, map[string]any{ 115 + "error": err.Error(), 116 + }) 117 + return 118 + } 119 + 120 + if sendHeader { 121 + ctx.Response().WriteHeader(500) 122 + } 123 + } 124 + } 125 + 126 + // TODO: this API is temporary until we formalize what we want here 127 + 128 + e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.EventsHandler) 129 + e.GET("/xrpc/_health", s.HandleHealthCheck) 130 + 131 + // In order to support booting on random ports in tests, we need to tell the 132 + // Echo instance it's already got a port, and then use its StartServer 133 + // method to re-use that listener. 134 + e.Listener = listen 135 + srv := &http.Server{} 136 + return e.StartServer(srv) 137 + } 138 + 139 + type HealthStatus struct { 140 + Status string `json:"status"` 141 + Message string `json:"msg,omitempty"` 142 + } 143 + 144 + func (s *Splitter) HandleHealthCheck(c echo.Context) error { 145 + return c.JSON(200, HealthStatus{Status: "ok"}) 146 + } 147 + 148 + func (s *Splitter) EventsHandler(c echo.Context) error { 149 + var since *int64 150 + if sinceVal := c.QueryParam("cursor"); sinceVal != "" { 151 + sval, err := strconv.ParseInt(sinceVal, 10, 64) 152 + if err != nil { 153 + return err 154 + } 155 + since = &sval 156 + } 157 + 158 + ctx, cancel := context.WithCancel(c.Request().Context()) 159 + defer cancel() 160 + 161 + // TODO: authhhh 162 + conn, err := websocket.Upgrade(c.Response(), c.Request(), c.Response().Header(), 10<<10, 10<<10) 163 + if err != nil { 164 + return fmt.Errorf("upgrading websocket: %w", err) 165 + } 166 + 167 + lastWriteLk := sync.Mutex{} 168 + lastWrite := time.Now() 169 + 170 + // Start a goroutine to ping the client every 30 seconds to check if it's 171 + // still alive. If the client doesn't respond to a ping within 5 seconds, 172 + // we'll close the connection and teardown the consumer. 173 + go func() { 174 + ticker := time.NewTicker(30 * time.Second) 175 + defer ticker.Stop() 176 + 177 + for { 178 + select { 179 + case <-ticker.C: 180 + lastWriteLk.Lock() 181 + lw := lastWrite 182 + lastWriteLk.Unlock() 183 + 184 + if time.Since(lw) < 30*time.Second { 185 + continue 186 + } 187 + 188 + if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)); err != nil { 189 + log.Errorf("failed to ping client: %s", err) 190 + cancel() 191 + return 192 + } 193 + case <-ctx.Done(): 194 + return 195 + } 196 + } 197 + }() 198 + 199 + conn.SetPingHandler(func(message string) error { 200 + err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second*60)) 201 + if err == websocket.ErrCloseSent { 202 + return nil 203 + } else if e, ok := err.(net.Error); ok && e.Temporary() { 204 + return nil 205 + } 206 + return err 207 + }) 208 + 209 + // Start a goroutine to read messages from the client and discard them. 210 + go func() { 211 + for { 212 + _, _, err := conn.ReadMessage() 213 + if err != nil { 214 + log.Errorf("failed to read message from client: %s", err) 215 + cancel() 216 + return 217 + } 218 + } 219 + }() 220 + 221 + ident := c.RealIP() + "-" + c.Request().UserAgent() 222 + 223 + evts, cleanup, err := s.events.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { return true }, since) 224 + if err != nil { 225 + return err 226 + } 227 + defer cleanup() 228 + 229 + // Keep track of the consumer for metrics and admin endpoints 230 + consumer := SocketConsumer{ 231 + RemoteAddr: c.RealIP(), 232 + UserAgent: c.Request().UserAgent(), 233 + ConnectedAt: time.Now(), 234 + } 235 + sentCounter := eventsSentCounter.WithLabelValues(consumer.RemoteAddr, consumer.UserAgent) 236 + consumer.EventsSent = sentCounter 237 + 238 + consumerID := s.registerConsumer(&consumer) 239 + defer s.cleanupConsumer(consumerID) 240 + 241 + log.Infow("new consumer", 242 + "remote_addr", consumer.RemoteAddr, 243 + "user_agent", consumer.UserAgent, 244 + "cursor", since, 245 + "consumer_id", consumerID, 246 + ) 247 + 248 + header := events.EventHeader{Op: events.EvtKindMessage} 249 + for { 250 + select { 251 + case evt := <-evts: 252 + wc, err := conn.NextWriter(websocket.BinaryMessage) 253 + if err != nil { 254 + log.Errorf("failed to get next writer: %s", err) 255 + return err 256 + } 257 + 258 + var obj lexutil.CBOR 259 + 260 + switch { 261 + case evt.Error != nil: 262 + header.Op = events.EvtKindErrorFrame 263 + obj = evt.Error 264 + case evt.RepoCommit != nil: 265 + header.MsgType = "#commit" 266 + obj = evt.RepoCommit 267 + case evt.RepoHandle != nil: 268 + header.MsgType = "#handle" 269 + obj = evt.RepoHandle 270 + case evt.RepoInfo != nil: 271 + header.MsgType = "#info" 272 + obj = evt.RepoInfo 273 + case evt.RepoMigrate != nil: 274 + header.MsgType = "#migrate" 275 + obj = evt.RepoMigrate 276 + case evt.RepoTombstone != nil: 277 + header.MsgType = "#tombstone" 278 + obj = evt.RepoTombstone 279 + default: 280 + return fmt.Errorf("unrecognized event kind") 281 + } 282 + 283 + if err := header.MarshalCBOR(wc); err != nil { 284 + return fmt.Errorf("failed to write header: %w", err) 285 + } 286 + 287 + if err := obj.MarshalCBOR(wc); err != nil { 288 + return fmt.Errorf("failed to write event: %w", err) 289 + } 290 + 291 + if err := wc.Close(); err != nil { 292 + return fmt.Errorf("failed to flush-close our event write: %w", err) 293 + } 294 + lastWriteLk.Lock() 295 + lastWrite = time.Now() 296 + lastWriteLk.Unlock() 297 + sentCounter.Inc() 298 + case <-ctx.Done(): 299 + return nil 300 + } 301 + } 302 + } 303 + 304 + type SocketConsumer struct { 305 + UserAgent string 306 + RemoteAddr string 307 + ConnectedAt time.Time 308 + EventsSent promclient.Counter 309 + } 310 + 311 + func (s *Splitter) registerConsumer(c *SocketConsumer) uint64 { 312 + s.consumersLk.Lock() 313 + defer s.consumersLk.Unlock() 314 + 315 + id := s.nextConsumerID 316 + s.nextConsumerID++ 317 + 318 + s.consumers[id] = c 319 + 320 + return id 321 + } 322 + 323 + func (s *Splitter) cleanupConsumer(id uint64) { 324 + s.consumersLk.Lock() 325 + defer s.consumersLk.Unlock() 326 + 327 + c := s.consumers[id] 328 + 329 + var m = &dto.Metric{} 330 + if err := c.EventsSent.Write(m); err != nil { 331 + log.Errorf("failed to get sent counter: %s", err) 332 + } 333 + 334 + log.Infow("consumer disconnected", 335 + "consumer_id", id, 336 + "remote_addr", c.RemoteAddr, 337 + "user_agent", c.UserAgent, 338 + "events_sent", m.Counter.GetValue()) 339 + 340 + delete(s.consumers, id) 341 + } 342 + 40 343 func sleepForBackoff(b int) time.Duration { 41 344 if b == 0 { 42 345 return 0 ··· 139 442 } 140 443 141 444 func (er *EventRingBuffer) Persist(ctx context.Context, evt *events.XRPCStreamEvent) error { 445 + fmt.Println("persist event", sequenceForEvent(evt)) 142 446 er.lk.Lock() 143 447 defer er.lk.Unlock() 144 448