this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

basic firehose consumer, for #identity events

+212 -11
+152
cmd/domesday/firehose.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "net/http" 7 + "net/url" 8 + "sync/atomic" 9 + "time" 10 + 11 + comatproto "github.com/bluesky-social/indigo/api/atproto" 12 + "github.com/bluesky-social/indigo/atproto/syntax" 13 + "github.com/bluesky-social/indigo/events/schedulers/parallel" 14 + 15 + "github.com/bluesky-social/indigo/events" 16 + "github.com/carlmjohnson/versioninfo" 17 + "github.com/gorilla/websocket" 18 + "github.com/redis/go-redis/v9" 19 + ) 20 + 21 + var firehoseCursorKey = "domes/firehoseSeq" 22 + 23 + func (srv *Server) RunFirehoseConsumer(ctx context.Context, host string, parallelism int) error { 24 + 25 + cur, err := srv.ReadLastCursor(ctx) 26 + if err != nil { 27 + return err 28 + } 29 + 30 + dialer := websocket.DefaultDialer 31 + u, err := url.Parse(host) 32 + if err != nil { 33 + return fmt.Errorf("invalid Host URI: %w", err) 34 + } 35 + u.Path = "xrpc/com.atproto.sync.subscribeRepos" 36 + if cur != 0 { 37 + u.RawQuery = fmt.Sprintf("cursor=%d", cur) 38 + } 39 + srv.logger.Info("subscribing to repo event stream", "upstream", host, "cursor", cur) 40 + con, _, err := dialer.Dial(u.String(), http.Header{ 41 + "User-Agent": []string{fmt.Sprintf("domesday/%s", versioninfo.Short())}, 42 + }) 43 + if err != nil { 44 + return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 45 + } 46 + 47 + rsc := &events.RepoStreamCallbacks{ 48 + RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 49 + atomic.StoreInt64(&srv.lastSeq, evt.Seq) 50 + ctx := context.Background() 51 + srv.logger.Info("flushing cache due to #identity firehose event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 52 + 53 + did, err := syntax.ParseDID(evt.Did) 54 + if err != nil { 55 + srv.logger.Warn("invalid DID in #identity event", "did", evt.Did, "seq", evt.Seq, "err", err) 56 + return nil 57 + } 58 + if err := srv.dir.PurgeDID(ctx, did); err != nil { 59 + srv.logger.Error("failed to purge DID from cache", "did", evt.Did, "seq", evt.Seq, "err", err) 60 + return nil 61 + } 62 + if evt.Handle == nil { 63 + return nil 64 + } 65 + handle, err := syntax.ParseHandle(*evt.Handle) 66 + if err != nil { 67 + srv.logger.Warn("invalid handle in #identity event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 68 + return nil 69 + } 70 + if err := srv.dir.PurgeHandle(ctx, handle); err != nil { 71 + srv.logger.Error("failed to purge handle from cache", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 72 + return nil 73 + } 74 + return nil 75 + }, 76 + } 77 + 78 + var scheduler events.Scheduler 79 + // use a fixed-parallelism scheduler if configured 80 + scheduler = parallel.NewScheduler( 81 + parallelism, 82 + 1000, 83 + host, 84 + rsc.EventHandler, 85 + ) 86 + srv.logger.Info("domesday firehose scheduler configured", "scheduler", "parallel", "initial", parallelism) 87 + 88 + return events.HandleRepoStream(ctx, con, scheduler, srv.logger) 89 + } 90 + 91 + func (srv *Server) ReadLastCursor(ctx context.Context) (int64, error) { 92 + // if redis isn't configured, just skip 93 + if srv.redisClient == nil { 94 + srv.logger.Info("redis not configured, skipping cursor read") 95 + return 0, nil 96 + } 97 + 98 + val, err := srv.redisClient.Get(ctx, firehoseCursorKey).Int64() 99 + if err == redis.Nil { 100 + srv.logger.Info("no pre-existing cursor in redis") 101 + return 0, nil 102 + } else if err != nil { 103 + return 0, err 104 + } 105 + srv.logger.Info("successfully found prior subscription cursor seq in redis", "seq", val) 106 + return val, nil 107 + } 108 + 109 + func (srv *Server) PersistCursor(ctx context.Context) error { 110 + // if redis isn't configured, just skip 111 + if srv.redisClient == nil { 112 + return nil 113 + } 114 + lastSeq := atomic.LoadInt64(&srv.lastSeq) 115 + if lastSeq <= 0 { 116 + return nil 117 + } 118 + err := srv.redisClient.Set(ctx, firehoseCursorKey, lastSeq, 14*24*time.Hour).Err() 119 + return err 120 + } 121 + 122 + // this method runs in a loop, persisting the current cursor state every 5 seconds 123 + func (srv *Server) RunPersistCursor(ctx context.Context) error { 124 + 125 + // if redis isn't configured, just skip 126 + if srv.redisClient == nil { 127 + return nil 128 + } 129 + ticker := time.NewTicker(5 * time.Second) 130 + for { 131 + select { 132 + case <-ctx.Done(): 133 + lastSeq := atomic.LoadInt64(&srv.lastSeq) 134 + if lastSeq >= 1 { 135 + srv.logger.Info("persisting final cursor seq value", "seq", lastSeq) 136 + err := srv.PersistCursor(ctx) 137 + if err != nil { 138 + srv.logger.Error("failed to persist cursor", "err", err, "seq", lastSeq) 139 + } 140 + } 141 + return nil 142 + case <-ticker.C: 143 + lastSeq := atomic.LoadInt64(&srv.lastSeq) 144 + if lastSeq >= 1 { 145 + err := srv.PersistCursor(ctx) 146 + if err != nil { 147 + srv.logger.Error("failed to persist cursor", "err", err, "seq", lastSeq) 148 + } 149 + } 150 + } 151 + } 152 + }
+33 -7
cmd/domesday/main.go
··· 82 82 Value: ":3989", 83 83 EnvVars: []string{"DOMESDAY_METRICS_LISTEN"}, 84 84 }, 85 + &cli.BoolFlag{ 86 + Name: "disable-firehose-consumer", 87 + Usage: "don't consume #identity events from firehose", 88 + EnvVars: []string{"DOMESDAY_DISABLE_FIREHOSE_CONSUMER"}, 89 + }, 90 + &cli.IntFlag{ 91 + Name: "firehose-parallelism", 92 + Usage: "number of concurrent firehose workers", 93 + Value: 4, 94 + EnvVars: []string{"HEPA_FIREHOSE_PARALLELISM"}, 95 + }, 85 96 }, 86 97 }, 87 98 &cli.Command{ ··· 173 184 174 185 func runServeCmd(cctx *cli.Context) error { 175 186 logger := configLogger(cctx, os.Stdout) 176 - //configOTEL("domesday") 187 + ctx := context.Background() 177 188 178 189 srv, err := NewServer( 179 190 Config{ 180 - Logger: logger, 181 - FirehoseHost: cctx.String("atp-relay-host"), 182 - RedisURL: cctx.String("redis-url"), 183 - Bind: cctx.String("bind"), 191 + Logger: logger, 192 + Bind: cctx.String("bind"), 193 + RedisURL: cctx.String("redis-url"), 184 194 }, 185 195 ) 186 196 if err != nil { 187 197 return fmt.Errorf("failed to construct server: %v", err) 188 198 } 189 199 200 + if !cctx.Bool("disable-firehose-consumer") { 201 + go func() { 202 + firehoseHost := cctx.String("atp-relay-host") 203 + firehoseParallelism := cctx.Int("firehose-parallelism") 204 + if err := srv.RunFirehoseConsumer(ctx, firehoseHost, firehoseParallelism); err != nil { 205 + slog.Error("firehose consumer thread failed", "err", err) 206 + // NOTE: not crashing or halting process here 207 + } 208 + }() 209 + go func() { 210 + if err := srv.RunPersistCursor(ctx); err != nil { 211 + slog.Error("firehose persist thread failed", "err", err) 212 + // NOTE: not crashing or halting process here 213 + } 214 + }() 215 + } 216 + 190 217 // prometheus HTTP endpoint: /metrics 191 218 go func() { 192 219 // TODO: what is this tuning for? just cargo-culted it ··· 194 221 runtime.SetMutexProfileFraction(10) 195 222 if err := srv.RunMetrics(cctx.String("metrics-listen")); err != nil { 196 223 slog.Error("failed to start metrics endpoint", "error", err) 197 - // XXX: really panic? hrm 198 - panic(fmt.Errorf("failed to start metrics endpoint: %w", err)) 224 + // NOTE: not crashing or halting process here 199 225 } 200 226 }() 201 227
+27 -4
cmd/domesday/server.go
··· 18 18 "github.com/labstack/echo/v4" 19 19 "github.com/labstack/echo/v4/middleware" 20 20 "github.com/prometheus/client_golang/prometheus/promhttp" 21 + "github.com/redis/go-redis/v9" 21 22 slogecho "github.com/samber/slog-echo" 22 23 "golang.org/x/time/rate" 23 24 ) ··· 27 28 echo *echo.Echo 28 29 httpd *http.Server 29 30 logger *slog.Logger 31 + 32 + // this redis client is used to store firehose offset 33 + redisClient *redis.Client 34 + 35 + // lastSeq is the most recent event sequence number we've received and begun to handle. 36 + // This number is periodically persisted to redis, if redis is present. 37 + // The value is best-effort (the stream handling itself is concurrent, so event numbers may not be monotonic), 38 + // but nonetheless, you must use atomics when updating or reading this (to avoid data races). 39 + lastSeq int64 30 40 } 31 41 32 42 type Config struct { 33 43 Logger *slog.Logger 34 - FirehoseHost string 35 44 PLCHost string 36 45 PLCRateLimit int 37 46 RedisURL string ··· 74 83 return nil, err 75 84 } 76 85 86 + // configure redis client (for firehose consumer) 87 + redisOpt, err := redis.ParseURL(config.RedisURL) 88 + if err != nil { 89 + return nil, fmt.Errorf("parsing redis URL: %v", err) 90 + } 91 + redisClient := redis.NewClient(redisOpt) 92 + // check redis connection 93 + _, err = redisClient.Ping(context.Background()).Result() 94 + if err != nil { 95 + return nil, fmt.Errorf("redis ping failed: %v", err) 96 + } 97 + 77 98 e := echo.New() 78 99 79 100 // httpd ··· 83 104 ) 84 105 85 106 srv := &Server{ 86 - echo: e, 87 - dir: redisDir, 88 - logger: logger, 107 + echo: e, 108 + dir: redisDir, 109 + logger: logger, 110 + redisClient: redisClient, 89 111 } 112 + 90 113 srv.httpd = &http.Server{ 91 114 Handler: srv, 92 115 Addr: config.Bind,