this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix edge cases around playback-to-live transition

+361 -111
+149
cmd/rainbow/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "os" 6 + "os/signal" 7 + "syscall" 8 + "time" 9 + 10 + "github.com/bluesky-social/indigo/splitter" 11 + "github.com/bluesky-social/indigo/util/version" 12 + _ "go.uber.org/automaxprocs" 13 + 14 + _ "net/http/pprof" 15 + 16 + _ "github.com/joho/godotenv/autoload" 17 + 18 + logging "github.com/ipfs/go-log" 19 + "github.com/urfave/cli/v2" 20 + "go.opentelemetry.io/otel" 21 + "go.opentelemetry.io/otel/attribute" 22 + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" 23 + "go.opentelemetry.io/otel/sdk/resource" 24 + tracesdk "go.opentelemetry.io/otel/sdk/trace" 25 + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" 26 + ) 27 + 28 + var log = logging.Logger("splitter") 29 + 30 + func init() { 31 + // control log level using, eg, GOLOG_LOG_LEVEL=debug 32 + logging.SetAllLoggers(logging.LevelDebug) 33 + } 34 + 35 + func main() { 36 + run(os.Args) 37 + } 38 + 39 + func run(args []string) { 40 + app := cli.App{ 41 + Name: "splitter", 42 + Usage: "firehose proxy", 43 + Version: version.Version, 44 + } 45 + 46 + app.Flags = []cli.Flag{ 47 + &cli.BoolFlag{ 48 + Name: "crawl-insecure-ws", 49 + Usage: "when connecting to PDS instances, use ws:// instead of wss://", 50 + }, 51 + &cli.StringFlag{ 52 + Name: "splitter-host", 53 + Value: "bsky.network", 54 + }, 55 + &cli.StringFlag{ 56 + Name: "api-listen", 57 + Value: ":2480", 58 + }, 59 + &cli.StringFlag{ 60 + Name: "metrics-listen", 61 + Value: ":2481", 62 + EnvVars: []string{"SPLITTER_METRICS_LISTEN"}, 63 + }, 64 + } 65 + 66 + app.Action = Splitter 67 + err := app.Run(os.Args) 68 + if err != nil { 69 + log.Fatal(err) 70 + } 71 + } 72 + 73 + func Splitter(cctx *cli.Context) error { 74 + // Trap SIGINT to trigger a shutdown. 75 + signals := make(chan os.Signal, 1) 76 + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 77 + 78 + // Enable OTLP HTTP exporter 79 + // For relevant environment variables: 80 + // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables 81 + // At a minimum, you need to set 82 + // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 83 + if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" { 84 + log.Infow("setting up trace exporter", "endpoint", ep) 85 + ctx, cancel := context.WithCancel(context.Background()) 86 + defer cancel() 87 + 88 + exp, err := otlptracehttp.New(ctx) 89 + if err != nil { 90 + log.Fatalw("failed to create trace exporter", "error", err) 91 + } 92 + defer func() { 93 + ctx, cancel := context.WithTimeout(context.Background(), time.Second) 94 + defer cancel() 95 + if err := exp.Shutdown(ctx); err != nil { 96 + log.Errorw("failed to shutdown trace exporter", "error", err) 97 + } 98 + }() 99 + 100 + tp := tracesdk.NewTracerProvider( 101 + tracesdk.WithBatcher(exp), 102 + tracesdk.WithResource(resource.NewWithAttributes( 103 + semconv.SchemaURL, 104 + semconv.ServiceNameKey.String("splitter"), 105 + attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog 106 + attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others 107 + attribute.Int64("ID", 1), 108 + )), 109 + ) 110 + otel.SetTracerProvider(tp) 111 + } 112 + 113 + spl := splitter.NewSplitter(cctx.String("splitter-host")) 114 + 115 + // set up metrics endpoint 116 + go func() { 117 + if err := spl.StartMetrics(cctx.String("metrics-listen")); err != nil { 118 + log.Fatalf("failed to start metrics endpoint: %s", err) 119 + } 120 + }() 121 + 122 + runErr := make(chan error, 1) 123 + 124 + go func() { 125 + err := spl.Start(cctx.String("api-listen")) 126 + runErr <- err 127 + }() 128 + 129 + log.Infow("startup complete") 130 + select { 131 + case <-signals: 132 + log.Info("received shutdown signal") 133 + if err := spl.Shutdown(); err != nil { 134 + log.Errorw("error during Splitter shutdown", "err", err) 135 + } 136 + case err := <-runErr: 137 + if err != nil { 138 + log.Errorw("error during Splitter startup", "err", err) 139 + } 140 + log.Info("shutting down") 141 + if err := spl.Shutdown(); err != nil { 142 + log.Errorw("error during Splitter shutdown", "err", err) 143 + } 144 + } 145 + 146 + log.Info("shutdown complete") 147 + 148 + return nil 149 + }
+11
splitter/metrics.go
··· 1 + package splitter 2 + 3 + import ( 4 + "github.com/prometheus/client_golang/prometheus" 5 + "github.com/prometheus/client_golang/prometheus/promauto" 6 + ) 7 + 8 + var eventsSentCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 9 + Name: "spl_events_sent_counter", 10 + Help: "The total number of events sent to consumers", 11 + }, []string{"remote_addr", "user_agent"})
+144
splitter/ringbuf.go
··· 1 + package splitter 2 + 3 + import ( 4 + "context" 5 + "sync" 6 + 7 + events "github.com/bluesky-social/indigo/events" 8 + "github.com/bluesky-social/indigo/models" 9 + ) 10 + 11 + func NewEventRingBuffer(chunkSize, nchunks int) *EventRingBuffer { 12 + return &EventRingBuffer{ 13 + chunkSize: chunkSize, 14 + maxChunkCount: nchunks, 15 + } 16 + } 17 + 18 + type EventRingBuffer struct { 19 + lk sync.Mutex 20 + chunks []*ringChunk 21 + chunkSize int 22 + maxChunkCount int 23 + 24 + broadcast func(*events.XRPCStreamEvent) 25 + } 26 + 27 + type ringChunk struct { 28 + lk sync.Mutex 29 + buf []*events.XRPCStreamEvent 30 + } 31 + 32 + func (rc *ringChunk) append(evt *events.XRPCStreamEvent) { 33 + rc.lk.Lock() 34 + defer rc.lk.Unlock() 35 + rc.buf = append(rc.buf, evt) 36 + } 37 + 38 + func (rc *ringChunk) events() []*events.XRPCStreamEvent { 39 + rc.lk.Lock() 40 + defer rc.lk.Unlock() 41 + return rc.buf 42 + } 43 + 44 + func (er *EventRingBuffer) Persist(ctx context.Context, evt *events.XRPCStreamEvent) error { 45 + er.lk.Lock() 46 + defer er.lk.Unlock() 47 + 48 + if len(er.chunks) == 0 { 49 + er.chunks = []*ringChunk{new(ringChunk)} 50 + } 51 + 52 + last := er.chunks[len(er.chunks)-1] 53 + if len(last.buf) >= er.chunkSize { 54 + last = new(ringChunk) 55 + er.chunks = append(er.chunks, last) 56 + if len(er.chunks) > er.maxChunkCount { 57 + er.chunks = er.chunks[1:] 58 + } 59 + } 60 + 61 + last.append(evt) 62 + 63 + er.broadcast(evt) 64 + return nil 65 + } 66 + 67 + func (er *EventRingBuffer) Flush(context.Context) error { 68 + return nil 69 + } 70 + 71 + func (er *EventRingBuffer) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { 72 + // run playback a few times to get as close to 'live' as possible before returning 73 + for i := 0; i < 10; i++ { 74 + n, err := er.playbackRound(ctx, since, cb) 75 + if err != nil { 76 + return err 77 + } 78 + 79 + // playback had no new events 80 + if n-since == 0 { 81 + return nil 82 + } 83 + since = n 84 + } 85 + 86 + return nil 87 + } 88 + 89 + func (er *EventRingBuffer) playbackRound(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) (int64, error) { 90 + // grab a snapshot of the current chunks 91 + er.lk.Lock() 92 + chunks := er.chunks 93 + er.lk.Unlock() 94 + 95 + i := len(chunks) - 1 96 + for ; i >= 0; i-- { 97 + c := chunks[i] 98 + evts := c.events() 99 + if since > sequenceForEvent(evts[len(evts)-1]) { 100 + i++ 101 + break 102 + } 103 + } 104 + if i < 0 { 105 + i = 0 106 + } 107 + 108 + var lastSeq int64 = since 109 + for _, c := range chunks[i:] { 110 + var nread int 111 + evts := c.events() 112 + for nread < len(evts) { 113 + for _, e := range evts[nread:] { 114 + nread++ 115 + seq := sequenceForEvent(e) 116 + if seq <= since { 117 + continue 118 + } 119 + 120 + if err := cb(e); err != nil { 121 + return 0, err 122 + } 123 + lastSeq = seq 124 + } 125 + 126 + // recheck evts buffer to see if more were added while we were here 127 + evts = c.events() 128 + } 129 + } 130 + 131 + return lastSeq, nil 132 + } 133 + 134 + func (er *EventRingBuffer) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) { 135 + er.broadcast = brc 136 + } 137 + 138 + func (er *EventRingBuffer) Shutdown(context.Context) error { 139 + return nil 140 + } 141 + 142 + func (er *EventRingBuffer) TakeDownRepo(context.Context, models.Uid) error { 143 + return nil 144 + }
+57 -111
splitter/splitter.go
··· 3 3 import ( 4 4 "context" 5 5 "fmt" 6 + "io" 6 7 "math/rand" 7 8 "net" 8 9 "net/http" 10 + "os" 9 11 "strconv" 10 12 "strings" 11 13 "sync" ··· 15 17 events "github.com/bluesky-social/indigo/events" 16 18 "github.com/bluesky-social/indigo/events/schedulers/sequential" 17 19 lexutil "github.com/bluesky-social/indigo/lex/util" 18 - "github.com/bluesky-social/indigo/models" 19 20 "github.com/gorilla/websocket" 20 21 logging "github.com/ipfs/go-log" 21 22 "github.com/labstack/echo/v4" ··· 31 32 Host string 32 33 erb *EventRingBuffer 33 34 events *events.EventManager 35 + 36 + // cursor storage 37 + cursorFile string 34 38 35 39 // Management of Socket Consumers 36 40 consumersLk sync.RWMutex ··· 39 43 } 40 44 41 45 func NewSplitter(host string) *Splitter { 42 - erb := NewEventRingBuffer(20000, 1000) 46 + erb := NewEventRingBuffer(20_000, 1000) 43 47 44 48 em := events.NewEventManager(erb) 45 49 return &Splitter{ 46 - Host: host, 47 - erb: erb, 48 - events: em, 49 - consumers: make(map[uint64]*SocketConsumer), 50 + cursorFile: "cursor-file", 51 + Host: host, 52 + erb: erb, 53 + events: em, 54 + consumers: make(map[uint64]*SocketConsumer), 50 55 } 51 56 } 52 57 ··· 55 60 ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) 56 61 defer cancel() 57 62 58 - go s.subscribeWithRedialer(context.Background(), s.Host, 0) 63 + curs, err := s.getLastCursor() 64 + if err != nil { 65 + return fmt.Errorf("loading cursor failed: %w", err) 66 + } 67 + 68 + go s.subscribeWithRedialer(context.Background(), s.Host, curs) 59 69 60 70 li, err := lc.Listen(ctx, "tcp", addr) 61 71 if err != nil { ··· 365 375 default: 366 376 } 367 377 378 + header := http.Header{ 379 + "User-Agent": []string{"bgs-rainbow-v0"}, 380 + } 381 + 368 382 url := fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", protocol, host, cursor) 369 - con, res, err := d.DialContext(ctx, url, nil) 383 + con, res, err := d.DialContext(ctx, url, header) 370 384 if err != nil { 371 385 log.Warnw("dialing failed", "host", host, "err", err, "backoff", backoff) 372 386 time.Sleep(sleepForBackoff(backoff)) ··· 387 401 ctx, cancel := context.WithCancel(ctx) 388 402 defer cancel() 389 403 390 - sched := sequential.NewScheduler("splitter", s.events.AddEvent) 404 + sched := sequential.NewScheduler("splitter", func(ctx context.Context, evt *events.XRPCStreamEvent) error { 405 + seq := sequenceForEvent(evt) 406 + if seq < 0 { 407 + // ignore info events and other unsupported types 408 + return nil 409 + } 410 + 411 + if err := s.events.AddEvent(ctx, evt); err != nil { 412 + return err 413 + } 414 + 415 + if seq%5000 == 0 { 416 + if err := s.writeCursor(seq); err != nil { 417 + log.Errorf("write cursor failed: %s", err) 418 + } 419 + } 420 + 421 + *lastCursor = seq 422 + return nil 423 + }) 391 424 return events.HandleRepoStream(ctx, con, sched) 392 425 } 393 426 ··· 408 441 } 409 442 } 410 443 411 - func NewEventRingBuffer(chunkSize, nchunks int) *EventRingBuffer { 412 - return &EventRingBuffer{ 413 - chunkSize: chunkSize, 414 - maxChunkCount: nchunks, 415 - } 416 - } 417 - 418 - type EventRingBuffer struct { 419 - lk sync.Mutex 420 - chunks []*ringChunk 421 - chunkSize int 422 - maxChunkCount int 423 - 424 - broadcast func(*events.XRPCStreamEvent) 425 - } 426 - 427 - type ringChunk struct { 428 - lk sync.Mutex 429 - buf []*events.XRPCStreamEvent 430 - } 431 - 432 - func (rc *ringChunk) append(evt *events.XRPCStreamEvent) { 433 - rc.lk.Lock() 434 - defer rc.lk.Unlock() 435 - rc.buf = append(rc.buf, evt) 436 - } 437 - 438 - func (rc *ringChunk) events() []*events.XRPCStreamEvent { 439 - rc.lk.Lock() 440 - defer rc.lk.Unlock() 441 - return rc.buf 442 - } 443 - 444 - func (er *EventRingBuffer) Persist(ctx context.Context, evt *events.XRPCStreamEvent) error { 445 - fmt.Println("persist event", sequenceForEvent(evt)) 446 - er.lk.Lock() 447 - defer er.lk.Unlock() 448 - 449 - if len(er.chunks) == 0 { 450 - er.chunks = []*ringChunk{new(ringChunk)} 451 - } 452 - 453 - last := er.chunks[len(er.chunks)-1] 454 - if len(last.buf) >= er.chunkSize { 455 - last = new(ringChunk) 456 - er.chunks = append(er.chunks, last) 457 - if len(er.chunks) > er.maxChunkCount { 458 - er.chunks = er.chunks[1:] 444 + func (s *Splitter) getLastCursor() (int64, error) { 445 + fi, err := os.Open(s.cursorFile) 446 + if err != nil { 447 + if os.IsNotExist(err) { 448 + return 0, nil 459 449 } 450 + return 0, err 460 451 } 461 452 462 - last.append(evt) 463 - 464 - er.broadcast(evt) 465 - return nil 466 - } 467 - 468 - func (er *EventRingBuffer) Flush(context.Context) error { 469 - return nil 470 - } 471 - 472 - func (er *EventRingBuffer) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { 473 - // grab a snapshot of the current chunks 474 - er.lk.Lock() 475 - chunks := er.chunks 476 - er.lk.Unlock() 477 - 478 - i := len(chunks) - 1 479 - for ; i >= 0; i-- { 480 - c := chunks[i] 481 - evts := c.events() 482 - if since > sequenceForEvent(evts[len(evts)-1]) { 483 - i++ 484 - break 485 - } 453 + b, err := io.ReadAll(fi) 454 + if err != nil { 455 + return 0, err 486 456 } 487 457 488 - for _, c := range chunks[i:] { 489 - var nread int 490 - evts := c.events() 491 - for nread < len(evts) { 492 - for _, e := range evts[nread:] { 493 - if since > 0 && sequenceForEvent(e) < since { 494 - continue 495 - } 496 - since = 0 497 - 498 - if err := cb(e); err != nil { 499 - return err 500 - } 501 - } 502 - 503 - // recheck evts buffer to see if more were added while we were here 504 - evts = c.events() 505 - } 458 + v, err := strconv.ParseInt(string(b), 10, 64) 459 + if err != nil { 460 + return 0, err 506 461 } 507 462 508 - // TODO: probably also check for if new chunks were added while we were iterating... 509 - return nil 510 - } 511 - 512 - func (er *EventRingBuffer) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) { 513 - er.broadcast = brc 463 + return v, nil 514 464 } 515 465 516 - func (er *EventRingBuffer) Shutdown(context.Context) error { 517 - return nil 518 - } 519 - 520 - func (er *EventRingBuffer) TakeDownRepo(context.Context, models.Uid) error { 521 - return nil 466 + func (s *Splitter) writeCursor(curs int64) error { 467 + return os.WriteFile(s.cursorFile, []byte(fmt.Sprint(curs)), 0664) 522 468 }