this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

hepa: fix routine order (#790)

authored by

bnewbold and committed by
GitHub
4c9a9a70 f336c8ec

+22 -22
+22 -22
cmd/hepa/main.go
··· 261 261 return fmt.Errorf("failed to construct server: %v", err) 262 262 } 263 263 264 - // firehose event consumer 265 - relayHost := cctx.String("atp-relay-host") 266 - if relayHost != "" { 267 - fc := consumer.FirehoseConsumer{ 268 - Engine: srv.Engine, 269 - Logger: logger.With("subsystem", "firehose-consumer"), 270 - Host: cctx.String("atp-relay-host"), 271 - Parallelism: cctx.Int("firehose-parallelism"), 272 - RedisClient: srv.RedisClient, 273 - } 274 - 275 - go func() { 276 - if err := fc.RunPersistCursor(ctx); err != nil { 277 - slog.Error("cursor routine failed", "err", err) 278 - } 279 - }() 280 - 281 - if err := fc.Run(ctx); err != nil { 282 - return fmt.Errorf("failure consuming and processing firehose: %w", err) 283 - } 284 - } 285 - 286 264 // ozone event consumer (if configured) 287 265 if srv.Engine.OzoneClient != nil { 288 266 oc := consumer.OzoneConsumer{ ··· 313 291 panic(fmt.Errorf("failed to start metrics endpoint: %w", err)) 314 292 } 315 293 }() 294 + 295 + // firehose event consumer (note this is actually mandatory) 296 + relayHost := cctx.String("atp-relay-host") 297 + if relayHost != "" { 298 + fc := consumer.FirehoseConsumer{ 299 + Engine: srv.Engine, 300 + Logger: logger.With("subsystem", "firehose-consumer"), 301 + Host: cctx.String("atp-relay-host"), 302 + Parallelism: cctx.Int("firehose-parallelism"), 303 + RedisClient: srv.RedisClient, 304 + } 305 + 306 + go func() { 307 + if err := fc.RunPersistCursor(ctx); err != nil { 308 + slog.Error("cursor routine failed", "err", err) 309 + } 310 + }() 311 + 312 + if err := fc.Run(ctx); err != nil { 313 + return fmt.Errorf("failure consuming and processing firehose: %w", err) 314 + } 315 + } 316 316 317 317 return nil 318 318 },