this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

WIP: creating bgs splitter daemon

+365
+147
cmd/splitter/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "os" 6 + "os/signal" 7 + "syscall" 8 + "time" 9 + 10 + "github.com/bluesky-social/indigo/bgs" 11 + "github.com/bluesky-social/indigo/util/version" 12 + _ "go.uber.org/automaxprocs" 13 + 14 + _ "net/http/pprof" 15 + 16 + _ "github.com/joho/godotenv/autoload" 17 + 18 + logging "github.com/ipfs/go-log" 19 + "github.com/urfave/cli/v2" 20 + "go.opentelemetry.io/otel" 21 + "go.opentelemetry.io/otel/attribute" 22 + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" 23 + "go.opentelemetry.io/otel/sdk/resource" 24 + tracesdk "go.opentelemetry.io/otel/sdk/trace" 25 + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" 26 + ) 27 + 28 + var log = logging.Logger("splitter") 29 + 30 + func init() { 31 + // control log level using, eg, GOLOG_LOG_LEVEL=debug 32 + //logging.SetAllLoggers(logging.LevelDebug) 33 + } 34 + 35 + func main() { 36 + run(os.Args) 37 + } 38 + 39 + func run(args []string) { 40 + app := cli.App{ 41 + Name: "splitter", 42 + Usage: "firehose proxy", 43 + Version: version.Version, 44 + } 45 + 46 + app.Flags = []cli.Flag{ 47 + &cli.BoolFlag{ 48 + Name: "crawl-insecure-ws", 49 + Usage: "when connecting to PDS instances, use ws:// instead of wss://", 50 + }, 51 + &cli.StringFlag{ 52 + Name: "api-listen", 53 + Value: ":2480", 54 + }, 55 + &cli.StringFlag{ 56 + Name: "metrics-listen", 57 + Value: ":2481", 58 + EnvVars: []string{"SPLITTER_METRICS_LISTEN"}, 59 + }, 60 + } 61 + 62 + app.Action = Splitter 63 + err := app.Run(os.Args) 64 + if err != nil { 65 + log.Fatal(err) 66 + } 67 + } 68 + 69 + func Splitter(cctx *cli.Context) error { 70 + // Trap SIGINT to trigger a shutdown. 71 + signals := make(chan os.Signal, 1) 72 + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 73 + 74 + // Enable OTLP HTTP exporter 75 + // For relevant environment variables: 76 + // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables 77 + // At a minimum, you need to set 78 + // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 79 + if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" { 80 + log.Infow("setting up trace exporter", "endpoint", ep) 81 + ctx, cancel := context.WithCancel(context.Background()) 82 + defer cancel() 83 + 84 + exp, err := otlptracehttp.New(ctx) 85 + if err != nil { 86 + log.Fatalw("failed to create trace exporter", "error", err) 87 + } 88 + defer func() { 89 + ctx, cancel := context.WithTimeout(context.Background(), time.Second) 90 + defer cancel() 91 + if err := exp.Shutdown(ctx); err != nil { 92 + log.Errorw("failed to shutdown trace exporter", "error", err) 93 + } 94 + }() 95 + 96 + tp := tracesdk.NewTracerProvider( 97 + tracesdk.WithBatcher(exp), 98 + tracesdk.WithResource(resource.NewWithAttributes( 99 + semconv.SchemaURL, 100 + semconv.ServiceNameKey.String("splitter"), 101 + attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog 102 + attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others 103 + attribute.Int64("ID", 1), 104 + )), 105 + ) 106 + otel.SetTracerProvider(tp) 107 + } 108 + 109 + spl := splitter.New(cctx.String("bgs-host")) 110 + 111 + // set up metrics endpoint 112 + go func() { 113 + if err := spl.StartMetrics(cctx.String("metrics-listen")); err != nil { 114 + log.Fatalf("failed to start metrics endpoint: %s", err) 115 + } 116 + }() 117 + 118 + runErr := make(chan error, 1) 119 + 120 + go func() { 121 + err := spl.Start(cctx.String("api-listen")) 122 + runErr <- err 123 + }() 124 + 125 + log.Infow("startup complete") 126 + select { 127 + case <-signals: 128 + log.Info("received shutdown signal") 129 + errs := spl.Shutdown() 130 + for err := range errs { 131 + log.Errorw("error during Splitter shutdown", "err", err) 132 + } 133 + case err := <-runErr: 134 + if err != nil { 135 + log.Errorw("error during Splitter startup", "err", err) 136 + } 137 + log.Info("shutting down") 138 + errs := bgs.Shutdown() 139 + for err := range errs { 140 + log.Errorw("error during Splitter shutdown", "err", err) 141 + } 142 + } 143 + 144 + log.Info("shutdown complete") 145 + 146 + return nil 147 + }
+218
splitter/splitter.go
··· 1 + package splitter 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "math/rand" 7 + "sync" 8 + "time" 9 + 10 + events "github.com/bluesky-social/indigo/events" 11 + "github.com/bluesky-social/indigo/events/schedulers/sequential" 12 + "github.com/bluesky-social/indigo/models" 13 + "github.com/gorilla/websocket" 14 + logging "github.com/ipfs/go-log" 15 + ) 16 + 17 + var log = logging.Logger("splitter") 18 + 19 + type Splitter struct { 20 + Host string 21 + erb *EventRingBuffer 22 + events *events.EventManager 23 + } 24 + 25 + func NewSplitter(host string, persister events.EventPersistence) *Splitter { 26 + erb := NewEventRingBuffer(20000, 1000) 27 + 28 + em := events.NewEventManager(erb) 29 + return &Splitter{ 30 + Host: host, 31 + erb: erb, 32 + events: em, 33 + } 34 + } 35 + 36 + func (s *Splitter) Start() error { 37 + return nil 38 + } 39 + 40 + func sleepForBackoff(b int) time.Duration { 41 + if b == 0 { 42 + return 0 43 + } 44 + 45 + if b < 50 { 46 + return time.Millisecond * time.Duration(rand.Intn(100)+(5*b)) 47 + } 48 + 49 + return time.Second * 5 50 + } 51 + 52 + func (s *Splitter) subscribeWithRedialer(ctx context.Context, host string, cursor int64) { 53 + d := websocket.Dialer{} 54 + 55 + protocol := "wss" 56 + 57 + var backoff int 58 + for { 59 + select { 60 + case <-ctx.Done(): 61 + return 62 + default: 63 + } 64 + 65 + url := fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", protocol, host, cursor) 66 + con, res, err := d.DialContext(ctx, url, nil) 67 + if err != nil { 68 + log.Warnw("dialing failed", "host", host, "err", err, "backoff", backoff) 69 + time.Sleep(sleepForBackoff(backoff)) 70 + backoff++ 71 + 72 + continue 73 + } 74 + 75 + log.Info("event subscription response code: ", res.StatusCode) 76 + 77 + if err := s.handleConnection(ctx, host, con, &cursor); err != nil { 78 + log.Warnf("connection to %q failed: %s", host, err) 79 + } 80 + } 81 + } 82 + 83 + func (s *Splitter) handleConnection(ctx context.Context, host string, con *websocket.Conn, lastCursor *int64) error { 84 + ctx, cancel := context.WithCancel(ctx) 85 + defer cancel() 86 + 87 + sched := sequential.NewScheduler("splitter", s.events.AddEvent) 88 + return events.HandleRepoStream(ctx, con, sched) 89 + } 90 + 91 + func sequenceForEvent(evt *events.XRPCStreamEvent) int64 { 92 + switch { 93 + case evt.RepoCommit != nil: 94 + return evt.RepoCommit.Seq 95 + case evt.RepoHandle != nil: 96 + return evt.RepoHandle.Seq 97 + case evt.RepoMigrate != nil: 98 + return evt.RepoMigrate.Seq 99 + case evt.RepoTombstone != nil: 100 + return evt.RepoTombstone.Seq 101 + case evt.RepoInfo != nil: 102 + return -1 103 + default: 104 + return -1 105 + } 106 + } 107 + 108 + func NewEventRingBuffer(chunkSize, nchunks int) *EventRingBuffer { 109 + return &EventRingBuffer{ 110 + chunkSize: chunkSize, 111 + maxChunkCount: nchunks, 112 + } 113 + } 114 + 115 + type EventRingBuffer struct { 116 + lk sync.Mutex 117 + chunks []*ringChunk 118 + chunkSize int 119 + maxChunkCount int 120 + 121 + broadcast func(*events.XRPCStreamEvent) 122 + } 123 + 124 + type ringChunk struct { 125 + lk sync.Mutex 126 + buf []*events.XRPCStreamEvent 127 + } 128 + 129 + func (rc *ringChunk) append(evt *events.XRPCStreamEvent) { 130 + rc.lk.Lock() 131 + defer rc.lk.Unlock() 132 + rc.buf = append(rc.buf, evt) 133 + } 134 + 135 + func (rc *ringChunk) events() []*events.XRPCStreamEvent { 136 + rc.lk.Lock() 137 + defer rc.lk.Unlock() 138 + return rc.buf 139 + } 140 + 141 + func (er *EventRingBuffer) Persist(ctx context.Context, evt *events.XRPCStreamEvent) error { 142 + er.lk.Lock() 143 + defer er.lk.Unlock() 144 + 145 + if len(er.chunks) == 0 { 146 + er.chunks = []*ringChunk{new(ringChunk)} 147 + } 148 + 149 + last := er.chunks[len(er.chunks)-1] 150 + if len(last.buf) >= er.chunkSize { 151 + last = new(ringChunk) 152 + er.chunks = append(er.chunks, last) 153 + if len(er.chunks) > er.maxChunkCount { 154 + er.chunks = er.chunks[1:] 155 + } 156 + } 157 + 158 + last.append(evt) 159 + 160 + er.broadcast(evt) 161 + return nil 162 + } 163 + 164 + func (er *EventRingBuffer) Flush(context.Context) error { 165 + return nil 166 + } 167 + 168 + func (er *EventRingBuffer) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { 169 + // grab a snapshot of the current chunks 170 + er.lk.Lock() 171 + chunks := er.chunks 172 + er.lk.Unlock() 173 + 174 + i := len(chunks) - 1 175 + for ; i >= 0; i-- { 176 + c := chunks[i] 177 + evts := c.events() 178 + if since > sequenceForEvent(evts[len(evts)-1]) { 179 + i++ 180 + break 181 + } 182 + } 183 + 184 + for _, c := range chunks[i:] { 185 + var nread int 186 + evts := c.events() 187 + for nread < len(evts) { 188 + for _, e := range evts[nread:] { 189 + if since > 0 && sequenceForEvent(e) < since { 190 + continue 191 + } 192 + since = 0 193 + 194 + if err := cb(e); err != nil { 195 + return err 196 + } 197 + } 198 + 199 + // recheck evts buffer to see if more were added while we were here 200 + evts = c.events() 201 + } 202 + } 203 + 204 + // TODO: probably also check for if new chunks were added while we were iterating... 205 + return nil 206 + } 207 + 208 + func (er *EventRingBuffer) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) { 209 + er.broadcast = brc 210 + } 211 + 212 + func (er *EventRingBuffer) Shutdown(context.Context) error { 213 + return nil 214 + } 215 + 216 + func (er *EventRingBuffer) TakeDownRepo(context.Context, models.Uid) error { 217 + return nil 218 + }