this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

hepa configurable parallelism (#584)

both the value, and whether to auto-scale

authored by

bnewbold and committed by
GitHub
47edba75 c8787dd8

+82 -61
+20 -9
cmd/hepa/consumer.go
··· 13 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 14 "github.com/bluesky-social/indigo/automod" 15 15 "github.com/bluesky-social/indigo/events/schedulers/autoscaling" 16 + "github.com/bluesky-social/indigo/events/schedulers/parallel" 16 17 lexutil "github.com/bluesky-social/indigo/lex/util" 17 18 18 19 "github.com/bluesky-social/indigo/events" ··· 79 80 // TODO: other event callbacks as needed 80 81 } 81 82 82 - // start at higher parallelism (somewhat arbitrary) 83 - scaleSettings := autoscaling.DefaultAutoscaleSettings() 84 - scaleSettings.Concurrency = 6 85 - scaleSettings.MaxConcurrency = 240 86 - return events.HandleRepoStream( 87 - ctx, con, autoscaling.NewScheduler( 88 - scaleSettings, 83 + var scheduler events.Scheduler 84 + if s.firehoseParallelism > 0 { 85 + // use a fixed-parallelism scheduler if configured 86 + scheduler = parallel.NewScheduler( 87 + s.firehoseParallelism, 88 + 1000, 89 89 s.bgshost, 90 90 rsc.EventHandler, 91 - ), 92 - ) 91 + ) 92 + s.logger.Info("hepa scheduler configured", "scheduler", "parallel", "initial", s.firehoseParallelism) 93 + } else { 94 + // otherwise use auto-scaling scheduler 95 + scaleSettings := autoscaling.DefaultAutoscaleSettings() 96 + // start at higher parallelism (somewhat arbitrary) 97 + scaleSettings.Concurrency = 4 98 + scaleSettings.MaxConcurrency = 200 99 + scheduler = autoscaling.NewScheduler(scaleSettings, s.bgshost, rsc.EventHandler) 100 + s.logger.Info("hepa scheduler configured", "scheduler", "autoscaling", "initial", scaleSettings.Concurrency, "max", scaleSettings.MaxConcurrency) 101 + } 102 + 103 + return events.HandleRepoStream(ctx, con, scheduler) 93 104 } 94 105 95 106 // TODO: move this to a "ParsePath" helper in syntax package?
+36 -29
cmd/hepa/main.go
··· 127 127 Usage: "HTTP header to bypass ratelimits", 128 128 EnvVars: []string{"HEPA_RATELIMIT_BYPASS", "RATELIMIT_BYPASS"}, 129 129 }, 130 + &cli.IntFlag{ 131 + Name: "firehose-parallelism", 132 + Usage: "force a fixed number of parallel firehose workers. default (or 0) for auto-scaling; 200 works for a large instance", 133 + EnvVars: []string{"HEPA_FIREHOSE_PARALLELISM"}, 134 + }, 130 135 } 131 136 132 137 app.Commands = []*cli.Command{ ··· 214 219 srv, err := NewServer( 215 220 dir, 216 221 Config{ 217 - BGSHost: cctx.String("atp-bgs-host"), 218 - BskyHost: cctx.String("atp-bsky-host"), 219 - Logger: logger, 220 - ModHost: cctx.String("atp-mod-host"), 221 - ModAdminToken: cctx.String("mod-admin-token"), 222 - ModUsername: cctx.String("mod-handle"), 223 - ModPassword: cctx.String("mod-password"), 224 - SetsFileJSON: cctx.String("sets-json-path"), 225 - RedisURL: cctx.String("redis-url"), 226 - SlackWebhookURL: cctx.String("slack-webhook-url"), 227 - HiveAPIToken: cctx.String("hiveai-api-token"), 228 - AbyssHost: cctx.String("abyss-host"), 229 - AbyssPassword: cctx.String("abyss-password"), 230 - RatelimitBypass: cctx.String("ratelimit-bypass"), 231 - RulesetName: cctx.String("ruleset"), 222 + BGSHost: cctx.String("atp-bgs-host"), 223 + BskyHost: cctx.String("atp-bsky-host"), 224 + Logger: logger, 225 + ModHost: cctx.String("atp-mod-host"), 226 + ModAdminToken: cctx.String("mod-admin-token"), 227 + ModUsername: cctx.String("mod-handle"), 228 + ModPassword: cctx.String("mod-password"), 229 + SetsFileJSON: cctx.String("sets-json-path"), 230 + RedisURL: cctx.String("redis-url"), 231 + SlackWebhookURL: cctx.String("slack-webhook-url"), 232 + HiveAPIToken: cctx.String("hiveai-api-token"), 233 + AbyssHost: cctx.String("abyss-host"), 234 + AbyssPassword: cctx.String("abyss-password"), 235 + RatelimitBypass: cctx.String("ratelimit-bypass"), 236 + RulesetName: cctx.String("ruleset"), 237 + FirehoseParallelism: cctx.Int("firehose-parallelism"), 232 238 }, 233 239 ) 234 240 if err != nil { ··· 280 286 return NewServer( 281 287 dir, 282 288 Config{ 283 - BGSHost: cctx.String("atp-bgs-host"), 284 - BskyHost: cctx.String("atp-bsky-host"), 285 - Logger: logger, 286 - ModHost: cctx.String("atp-mod-host"), 287 - ModAdminToken: cctx.String("mod-admin-token"), 288 - ModUsername: cctx.String("mod-handle"), 289 - ModPassword: cctx.String("mod-password"), 290 - SetsFileJSON: cctx.String("sets-json-path"), 291 - RedisURL: cctx.String("redis-url"), 292 - HiveAPIToken: cctx.String("hiveai-api-token"), 293 - AbyssHost: cctx.String("abyss-host"), 294 - AbyssPassword: cctx.String("abyss-password"), 295 - RatelimitBypass: cctx.String("ratelimit-bypass"), 296 - RulesetName: cctx.String("ruleset"), 289 + BGSHost: cctx.String("atp-bgs-host"), 290 + BskyHost: cctx.String("atp-bsky-host"), 291 + Logger: logger, 292 + ModHost: cctx.String("atp-mod-host"), 293 + ModAdminToken: cctx.String("mod-admin-token"), 294 + ModUsername: cctx.String("mod-handle"), 295 + ModPassword: cctx.String("mod-password"), 296 + SetsFileJSON: cctx.String("sets-json-path"), 297 + RedisURL: cctx.String("redis-url"), 298 + HiveAPIToken: cctx.String("hiveai-api-token"), 299 + AbyssHost: cctx.String("abyss-host"), 300 + AbyssPassword: cctx.String("abyss-password"), 301 + RatelimitBypass: cctx.String("ratelimit-bypass"), 302 + RulesetName: cctx.String("ruleset"), 303 + FirehoseParallelism: cctx.Int("firehose-parallelism"), 297 304 }, 298 305 ) 299 306 }
+26 -23
cmd/hepa/server.go
··· 27 27 ) 28 28 29 29 type Server struct { 30 - bgshost string 31 - logger *slog.Logger 32 - engine *automod.Engine 33 - rdb *redis.Client 30 + bgshost string 31 + firehoseParallelism int 32 + logger *slog.Logger 33 + engine *automod.Engine 34 + rdb *redis.Client 34 35 35 36 // lastSeq is the most recent event sequence number we've received and begun to handle. 36 37 // This number is periodically persisted to redis, if redis is present. ··· 40 41 } 41 42 42 43 type Config struct { 43 - BGSHost string 44 - BskyHost string 45 - ModHost string 46 - ModAdminToken string 47 - ModUsername string 48 - ModPassword string 49 - SetsFileJSON string 50 - RedisURL string 51 - SlackWebhookURL string 52 - HiveAPIToken string 53 - AbyssHost string 54 - AbyssPassword string 55 - RulesetName string 56 - RatelimitBypass string 57 - Logger *slog.Logger 44 + BGSHost string 45 + BskyHost string 46 + ModHost string 47 + ModAdminToken string 48 + ModUsername string 49 + ModPassword string 50 + SetsFileJSON string 51 + RedisURL string 52 + SlackWebhookURL string 53 + HiveAPIToken string 54 + AbyssHost string 55 + AbyssPassword string 56 + RulesetName string 57 + RatelimitBypass string 58 + FirehoseParallelism int 59 + Logger *slog.Logger 58 60 } 59 61 60 62 func NewServer(dir identity.Directory, config Config) (*Server, error) { ··· 204 206 } 205 207 206 208 s := &Server{ 207 - bgshost: config.BGSHost, 208 - logger: logger, 209 - engine: &engine, 210 - rdb: rdb, 209 + bgshost: config.BGSHost, 210 + firehoseParallelism: config.FirehoseParallelism, 211 + logger: logger, 212 + engine: &engine, 213 + rdb: rdb, 211 214 } 212 215 213 216 return s, nil