this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Collectiondir Improvements (#1029)

Perhaps best reviewed by looking at each individual commit.

All of these were found by running this, then sending a ctrl+c:

```bash
go run -race ./cmd/collectiondir serve --pebble ~/local/data/collectiondir --dau-directory ~/local/data/collectiondir
```

This PR improves a few small but important things about the
collectiondir service:

1. This service does not have pprof on the metrics router
(f16258882a1c6a82c25465547ae824afbeec9486)
a. `/metrics` and `/debug/pprof/` now live side-by-side
b. All services should have pprof enabled on private metrics routers
2. Fixes a race condition on startup where we're binding a bunch of
fields on `cs` after launching a goroutine that uses some of those
fields (82656e88f5ed5ff706c00cfabb98701c3e8e6d0a)
3. Fixes an issue where sending a SIGTERM to the service causes it to
start shutting down, but the service never actually dies because there
is an infinite hang due to an errant `cs.wg.Wait()`
(8c00259a165833ad0e8e8870691b5f55e3725ea8)
a. In general though, the `errchan` is not a good approach. Prefer to
let each background goroutine do its logging and only coordinate on
shutdown timing through a single variable (`cs.wg`)
4. Fix a race condition on the API server startup/shutdown
(5191ddd755928d5763cee92d7510c5999583d006)
5. Fix a race condition on the metrics server startup/shutdown
(c90e279b78482bea54451cbf6d0520faa432fa8d)

authored by

Jim Calabro and committed by
GitHub
d5c9745a c28d3a9a

+65 -48
+65 -48
cmd/collectiondir/serve.go
··· 5 5 "context" 6 6 "encoding/csv" 7 7 "encoding/json" 8 + "errors" 8 9 "fmt" 9 10 "log/slog" 10 11 "net" 11 12 "net/http" 13 + _ "net/http/pprof" 12 14 "net/url" 13 15 "os" 14 16 "os/signal" ··· 193 195 if err != nil { 194 196 return fmt.Errorf("lru init, %w", err) 195 197 } 196 - cs.wg.Add(1) 197 - go cs.ingestReceiver() 198 198 cs.log = log 199 199 cs.ctx = cctx.Context 200 200 cs.AdminToken = cctx.String("admin-token") 201 201 cs.ExepctedAuthHeader = "Bearer " + cs.AdminToken 202 + cs.wg.Add(1) 203 + go cs.ingestReceiver() 202 204 pebblePath := cctx.String("pebble") 203 205 cs.pcd = &PebbleCollectionDirectory{ 204 206 log: cs.log, ··· 215 217 } 216 218 } 217 219 cs.statsCacheFresh.L = &cs.statsCacheLock 218 - errchan := make(chan error, 3) 219 - apiAddr := cctx.String("api-listen") 220 + 221 + apiServerEcho, err := cs.createApiServer(cctx.Context, cctx.String("api-listen")) 222 + if err != nil { 223 + return err 224 + } 220 225 cs.wg.Add(1) 221 - go func() { 222 - errchan <- cs.StartApiServer(cctx.Context, apiAddr) 223 - }() 224 - metricsAddr := cctx.String("metrics-listen") 226 + go func() { cs.StartApiServer(cctx.Context, apiServerEcho) }() 227 + 228 + cs.createMetricsServer(cctx.String("metrics-listen")) 225 229 cs.wg.Add(1) 226 - go func() { 227 - errchan <- cs.StartMetricsServer(cctx.Context, metricsAddr) 228 - }() 230 + go func() { cs.StartMetricsServer(cctx.Context) }() 229 231 230 232 upstream := cctx.String("upstream") 231 233 if upstream != "" { ··· 247 249 go cs.handleFirehose(fhevents) 248 250 } 249 251 250 - select { 251 - case <-signals: 252 - log.Info("received shutdown signal") 253 - go errchanlog(cs.log, "server error", errchan) 254 - return cs.Shutdown() 255 - case err := <-errchan: 256 - if err != nil { 257 - log.Error("server error", "err", err) 258 - go errchanlog(cs.log, "server error", errchan) 259 - return cs.Shutdown() 260 - } 261 - } 262 - return nil 252 + <-signals 253 + log.Info("received shutdown signal") 254 + return cs.Shutdown() 263 255 } 264 256 265 257 func (cs *collectionServer) openDau() error { ··· 282 274 return nil 283 275 } 284 276 285 - func errchanlog(log *slog.Logger, msg string, errchan <-chan error) { 286 - for err := range errchan { 287 - log.Error(msg, "err", err) 288 - } 289 - } 290 - 291 277 func (cs *collectionServer) Shutdown() error { 292 278 close(cs.shutdown) 293 - go func() { 279 + 280 + func() { 281 + ctx, cancel := context.WithTimeout(context.Background(), time.Second) 282 + defer cancel() 283 + 294 284 cs.log.Info("metrics shutdown start") 295 - sherr := cs.metricsServer.Shutdown(context.Background()) 285 + sherr := cs.metricsServer.Shutdown(ctx) 296 286 cs.log.Info("metrics shutdown", "err", sherr) 297 287 }() 298 - cs.log.Info("api shutdown start...") 299 - err := cs.apiServer.Shutdown(context.Background()) 300 - cs.log.Info("api shutdown, thread wait...", "err", err) 301 - cs.wg.Wait() 288 + 289 + func() { 290 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 291 + defer cancel() 292 + 293 + cs.log.Info("api shutdown start...") 294 + err := cs.apiServer.Shutdown(ctx) 295 + cs.log.Info("api shutdown, thread wait...", "err", err) 296 + }() 297 + 302 298 cs.log.Info("threads done, db close...") 303 - ee := cs.pcd.Close() 304 - if ee != nil { 305 - cs.log.Error("failed to shutdown pebble", "err", ee) 299 + err := cs.pcd.Close() 300 + if err != nil { 301 + cs.log.Error("failed to shutdown pebble", "err", err) 306 302 } 307 303 cs.log.Info("db done. done.") 304 + cs.wg.Wait() 308 305 return err 309 306 } 310 307 ··· 384 381 } 385 382 } 386 383 387 - func (cs *collectionServer) StartMetricsServer(ctx context.Context, addr string) error { 388 - defer cs.wg.Done() 389 - defer cs.log.Info("metrics server exit") 384 + func (cs *collectionServer) createMetricsServer(addr string) { 385 + e := echo.New() 386 + e.GET("/metrics", echo.WrapHandler(promhttp.Handler())) 387 + e.Any("/debug/pprof/*", echo.WrapHandler(http.DefaultServeMux)) 388 + 390 389 cs.metricsServer = &http.Server{ 391 390 Addr: addr, 392 - Handler: promhttp.Handler(), 391 + Handler: e, 393 392 } 394 - return cs.metricsServer.ListenAndServe() 395 393 } 396 394 397 - func (cs *collectionServer) StartApiServer(ctx context.Context, addr string) error { 395 + func (cs *collectionServer) StartMetricsServer(ctx context.Context) { 398 396 defer cs.wg.Done() 399 - defer cs.log.Info("api server exit") 397 + defer cs.log.Info("metrics server exit") 398 + 399 + err := cs.metricsServer.ListenAndServe() 400 + if err != nil && !errors.Is(err, http.ErrServerClosed) { 401 + slog.Error("error in metrics server", "err", err) 402 + os.Exit(1) 403 + } 404 + } 405 + 406 + func (cs *collectionServer) createApiServer(ctx context.Context, addr string) (*echo.Echo, error) { 400 407 var lc net.ListenConfig 401 408 li, err := lc.Listen(ctx, "tcp", addr) 402 409 if err != nil { 403 - return err 410 + return nil, err 404 411 } 405 412 e := echo.New() 406 413 e.HideBanner = true ··· 430 437 Handler: e, 431 438 } 432 439 cs.apiServer = srv 433 - return srv.Serve(li) 440 + return e, nil 441 + } 442 + 443 + func (cs *collectionServer) StartApiServer(ctx context.Context, e *echo.Echo) { 444 + defer cs.wg.Done() 445 + defer cs.log.Info("api server exit") 446 + err := cs.apiServer.Serve(e.Listener) 447 + if err != nil && !errors.Is(err, http.ErrServerClosed) { 448 + slog.Error("error in api server", "err", err) 449 + os.Exit(1) 450 + } 434 451 } 435 452 436 453 const statsCacheDuration = time.Second * 300