this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add Querycheck: a service to check the plan and execution time of DB Queries (#259)

Querycheck provides a continuous, live-configured, query plan checking
tool.

Queries can be added/updated/removed at-runtime to allow for use in
investigation of DB performance issues in-vivo.

A `/metrics` endpoint exposes a large set of Prometheus metrics for all
executed queries which can be scraped and graphed in tailored dashboards
based on the needs and behavior of each query.

authored by

Jaz and committed by
GitHub
3016bf14 47aaeaab

+884 -3
+183
cmd/querycheck/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log" 7 + "os" 8 + "os/signal" 9 + "sync" 10 + "syscall" 11 + 12 + "net/http" 13 + _ "net/http/pprof" 14 + 15 + "github.com/bluesky-social/indigo/querycheck" 16 + "github.com/bluesky-social/indigo/util/tracing" 17 + "github.com/labstack/echo-contrib/pprof" 18 + "github.com/labstack/echo/v4" 19 + 20 + "github.com/labstack/echo/v4/middleware" 21 + 22 + "github.com/prometheus/client_golang/prometheus/promhttp" 23 + "go.opentelemetry.io/otel/trace" 24 + "go.uber.org/zap" 25 + 26 + "github.com/urfave/cli/v2" 27 + ) 28 + 29 + func main() { 30 + app := cli.App{ 31 + Name: "querycheck", 32 + Usage: "a postgresql query plan checker", 33 + Version: "0.0.1", 34 + } 35 + 36 + app.Flags = []cli.Flag{ 37 + &cli.StringFlag{ 38 + Name: "postgres-url", 39 + Usage: "postgres url for storing events", 40 + Value: "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable", 41 + EnvVars: []string{"POSTGRES_URL"}, 42 + }, 43 + &cli.IntFlag{ 44 + Name: "port", 45 + Usage: "port to serve metrics on", 46 + Value: 8080, 47 + EnvVars: []string{"PORT"}, 48 + }, 49 + &cli.StringFlag{ 50 + Name: "auth-token", 51 + Usage: "auth token for accessing the querycheck api", 52 + Value: "", 53 + EnvVars: []string{"AUTH_TOKEN"}, 54 + }, 55 + } 56 + 57 + app.Action = Querycheck 58 + 59 + err := app.Run(os.Args) 60 + if err != nil { 61 + log.Fatal(err) 62 + } 63 + } 64 + 65 + var tracer trace.Tracer 66 + 67 + // Querycheck is the main function for querycheck 68 + func Querycheck(cctx *cli.Context) error { 69 + ctx := cctx.Context 70 + ctx, cancel := context.WithCancel(ctx) 71 + defer cancel() 72 + 73 + // Trap SIGINT to trigger a shutdown. 74 + signals := make(chan os.Signal, 1) 75 + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 76 + 77 + rawlog, err := zap.NewDevelopment() 78 + if err != nil { 79 + log.Fatalf("failed to create logger: %+v\n", err) 80 + } 81 + defer func() { 82 + log.Printf("main function teardown\n") 83 + err := rawlog.Sync() 84 + if err != nil { 85 + log.Printf("failed to sync logger on teardown: %+v", err.Error()) 86 + } 87 + }() 88 + 89 + log := rawlog.Sugar().With("source", "querycheck_main") 90 + 91 + log.Info("starting querycheck") 92 + 93 + // Registers a tracer Provider globally if the exporter endpoint is set 94 + if os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") != "" { 95 + log.Info("initializing tracer...") 96 + shutdown, err := tracing.InstallExportPipeline(ctx, "Querycheck", 1) 97 + if err != nil { 98 + log.Fatal(err) 99 + } 100 + defer func() { 101 + if err := shutdown(ctx); err != nil { 102 + log.Fatal(err) 103 + } 104 + }() 105 + } 106 + 107 + wg := sync.WaitGroup{} 108 + 109 + // HTTP Server setup and Middleware Plumbing 110 + e := echo.New() 111 + e.HideBanner = true 112 + e.HidePort = true 113 + pprof.Register(e) 114 + e.GET("/metrics", echo.WrapHandler(promhttp.Handler())) 115 + e.Use(middleware.LoggerWithConfig(middleware.DefaultLoggerConfig)) 116 + 117 + // Start the query checker 118 + querychecker, err := querycheck.NewQuerychecker(ctx, cctx.String("postgres-url")) 119 + if err != nil { 120 + log.Fatalf("failed to create querychecker: %+v\n", err) 121 + } 122 + 123 + // getLikeCountQuery := `SELECT * 124 + // FROM like_counts 125 + // WHERE actor_did = 'did:plc:q6gjnaw2blty4crticxkmujt' 126 + // AND ns = 'app.bsky.feed.post' 127 + // AND rkey = '3k3jf5lgbsw24' 128 + // LIMIT 1;` 129 + 130 + // querychecker.AddQuery(ctx, "get_like_count", getLikeCountQuery, time.Second*20) 131 + 132 + err = querychecker.Start() 133 + if err != nil { 134 + log.Fatalf("failed to start querychecker: %+v\n", err) 135 + } 136 + 137 + e.Use(func(next echo.HandlerFunc) echo.HandlerFunc { 138 + return func(c echo.Context) error { 139 + if cctx.String("auth-token") != "" && c.Request().Header.Get("Authorization") != cctx.String("auth-token") { 140 + return c.String(http.StatusUnauthorized, "unauthorized") 141 + } 142 + return next(c) 143 + } 144 + }) 145 + 146 + e.GET("/query", querychecker.HandleGetQuery) 147 + e.GET("/queries", querychecker.HandleGetQueries) 148 + e.POST("/query", querychecker.HandleAddQuery) 149 + e.PUT("/query", querychecker.HandleUpdateQuery) 150 + e.DELETE("/query", querychecker.HandleDeleteQuery) 151 + 152 + // Start the metrics server 153 + wg.Add(1) 154 + go func() { 155 + log.Infof("starting metrics server on port %d", cctx.Int("port")) 156 + if err := e.Start(fmt.Sprintf(":%d", cctx.Int("port"))); err != nil { 157 + log.Errorf("failed to start metrics server: %+v\n", err) 158 + } 159 + wg.Done() 160 + }() 161 + 162 + select { 163 + case <-signals: 164 + cancel() 165 + fmt.Println("shutting down on signal") 166 + case <-ctx.Done(): 167 + fmt.Println("shutting down on context done") 168 + } 169 + 170 + log.Info("shutting down, waiting for workers to clean up...") 171 + 172 + if err := e.Shutdown(ctx); err != nil { 173 + log.Errorf("failed to shut down metrics server: %+v\n", err) 174 + wg.Done() 175 + } 176 + 177 + querychecker.Stop() 178 + 179 + wg.Wait() 180 + log.Info("shut down successfully") 181 + 182 + return nil 183 + }
+3 -3
go.mod
··· 26 26 github.com/ipfs/go-log/v2 v2.5.1 27 27 github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 28 28 github.com/ipld/go-car/v2 v2.9.0 29 + github.com/jackc/pgx/v5 v5.3.0 29 30 github.com/joho/godotenv v1.5.1 30 31 github.com/labstack/echo-contrib v0.15.0 31 32 github.com/labstack/echo/v4 v4.10.2 ··· 45 46 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 46 47 go.opentelemetry.io/otel v1.16.0 47 48 go.opentelemetry.io/otel/exporters/jaeger v1.14.0 49 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 48 50 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.16.0 49 51 go.opentelemetry.io/otel/sdk v1.16.0 52 + go.opentelemetry.io/otel/trace v1.16.0 50 53 go.uber.org/automaxprocs v1.5.3 51 54 go.uber.org/zap v1.24.0 52 55 golang.org/x/crypto v0.11.0 ··· 94 97 github.com/ipld/go-ipld-prime v0.20.0 // indirect 95 98 github.com/jackc/pgpassfile v1.0.0 // indirect 96 99 github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect 97 - github.com/jackc/pgx/v5 v5.3.0 // indirect 98 100 github.com/jbenet/goprocess v0.1.4 // indirect 99 101 github.com/jinzhu/inflection v1.0.0 // indirect 100 102 github.com/jinzhu/now v1.1.5 // indirect ··· 131 133 github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect 132 134 go.opencensus.io v0.24.0 // indirect 133 135 go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect 134 - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 // indirect 135 136 go.opentelemetry.io/otel/metric v1.16.0 // indirect 136 - go.opentelemetry.io/otel/trace v1.16.0 // indirect 137 137 go.opentelemetry.io/proto/otlp v0.19.0 // indirect 138 138 go.uber.org/atomic v1.10.0 // indirect 139 139 go.uber.org/multierr v1.11.0 // indirect
+323
querycheck/check.go
··· 1 + package querycheck 2 + 3 + import ( 4 + "context" 5 + "log" 6 + "math" 7 + "sync" 8 + "time" 9 + 10 + "github.com/jackc/pgx/v5" 11 + "go.opentelemetry.io/otel" 12 + "go.opentelemetry.io/otel/attribute" 13 + "go.uber.org/zap" 14 + ) 15 + 16 + var tracer = otel.Tracer("querycheck") 17 + 18 + // Query is a query to check 19 + type Query struct { 20 + Name string 21 + Query string 22 + LatestPlan *QueryPlan 23 + PreviousPlan *QueryPlan 24 + LastChecked time.Time 25 + LastError error 26 + CheckEvery time.Duration 27 + 28 + lk sync.RWMutex 29 + in chan struct{} 30 + out chan struct{} 31 + } 32 + 33 + // Querychecker is a query checker meta object 34 + type Querychecker struct { 35 + Queries []*Query 36 + Logger *zap.SugaredLogger 37 + 38 + connectionURL string 39 + lk sync.RWMutex 40 + } 41 + 42 + // NewQuerychecker creates a new querychecker 43 + func NewQuerychecker(ctx context.Context, connectionURL string) (*Querychecker, error) { 44 + logger, err := zap.NewDevelopment() 45 + if err != nil { 46 + return nil, err 47 + } 48 + l := logger.Sugar().With("source", "querychecker_manager") 49 + 50 + return &Querychecker{ 51 + connectionURL: connectionURL, 52 + Logger: l, 53 + Queries: []*Query{}, 54 + }, nil 55 + } 56 + 57 + // AddQuery adds a query to the checker 58 + func (q *Querychecker) AddQuery(ctx context.Context, name, query string, checkEvery time.Duration) { 59 + ctx, span := tracer.Start(ctx, "AddQuery") 60 + defer span.End() 61 + 62 + span.SetAttributes(attribute.String("name", name)) 63 + span.SetAttributes(attribute.String("query", query)) 64 + span.SetAttributes(attribute.String("checkEvery", checkEvery.String())) 65 + 66 + q.lk.Lock() 67 + q.Queries = append(q.Queries, &Query{ 68 + Name: name, 69 + Query: query, 70 + CheckEvery: checkEvery, 71 + 72 + in: make(chan struct{}), 73 + out: make(chan struct{}), 74 + }) 75 + q.lk.Unlock() 76 + } 77 + 78 + // RemoveQuery removes a query from the checker 79 + func (q *Querychecker) RemoveQuery(ctx context.Context, name string) { 80 + ctx, span := tracer.Start(ctx, "RemoveQuery") 81 + defer span.End() 82 + 83 + span.SetAttributes(attribute.String("name", name)) 84 + 85 + q.lk.Lock() 86 + defer q.lk.Unlock() 87 + for i, qu := range q.Queries { 88 + if qu.Name == name { 89 + q.Queries = append(q.Queries[:i], q.Queries[i+1:]...) 90 + return 91 + } 92 + } 93 + } 94 + 95 + // GetQuery returns a copy of the query 96 + func (q *Querychecker) GetQuery(ctx context.Context, name string) *Query { 97 + ctx, span := tracer.Start(ctx, "GetQuery") 98 + defer span.End() 99 + 100 + span.SetAttributes(attribute.String("name", name)) 101 + 102 + q.lk.RLock() 103 + defer q.lk.RUnlock() 104 + for _, qu := range q.Queries { 105 + if qu.Name == name { 106 + return &Query{ 107 + Name: qu.Name, 108 + Query: qu.Query, 109 + LatestPlan: qu.LatestPlan, 110 + PreviousPlan: qu.PreviousPlan, 111 + LastChecked: qu.LastChecked, 112 + LastError: qu.LastError, 113 + CheckEvery: qu.CheckEvery, 114 + } 115 + } 116 + } 117 + return nil 118 + } 119 + 120 + // GetQueries returns a copy of the queries 121 + func (q *Querychecker) GetQueries(ctx context.Context) []*Query { 122 + ctx, span := tracer.Start(ctx, "GetQueries") 123 + defer span.End() 124 + 125 + q.lk.RLock() 126 + defer q.lk.RUnlock() 127 + queries := make([]*Query, len(q.Queries)) 128 + for i, qu := range q.Queries { 129 + queries[i] = &Query{ 130 + Name: qu.Name, 131 + Query: qu.Query, 132 + LatestPlan: qu.LatestPlan, 133 + PreviousPlan: qu.PreviousPlan, 134 + LastChecked: qu.LastChecked, 135 + LastError: qu.LastError, 136 + CheckEvery: qu.CheckEvery, 137 + } 138 + } 139 + 140 + return queries 141 + } 142 + 143 + // UpdateQuery updates a query 144 + func (q *Querychecker) UpdateQuery(ctx context.Context, name, query string, checkEvery time.Duration) { 145 + ctx, span := tracer.Start(ctx, "UpdateQuery") 146 + defer span.End() 147 + 148 + span.SetAttributes(attribute.String("name", name)) 149 + span.SetAttributes(attribute.String("query", query)) 150 + span.SetAttributes(attribute.String("checkEvery", checkEvery.String())) 151 + 152 + for _, qu := range q.Queries { 153 + if qu.Name == name { 154 + qu.lk.Lock() 155 + qu.Query = query 156 + qu.CheckEvery = checkEvery 157 + qu.lk.Unlock() 158 + return 159 + } 160 + } 161 + } 162 + 163 + // Start starts the query checker routines 164 + func (q *Querychecker) Start() error { 165 + ctx, span := tracer.Start(context.Background(), "Start") 166 + defer span.End() 167 + 168 + for _, qu := range q.Queries { 169 + go func(query *Query) { 170 + rawlog, err := zap.NewDevelopment() 171 + if err != nil { 172 + log.Fatalf("failed to create logger: %+v\n", err) 173 + } 174 + log := rawlog.Sugar().With("source", "query_checker_routine", "query", query.Name) 175 + 176 + log.Infof("query checker routine started for query: %s\n", query.Name) 177 + log.Infof("Query: \n%s\n", query.Query) 178 + 179 + // Check the query plan every CheckEvery duration 180 + ticker := time.NewTicker(query.CheckEvery) 181 + defer ticker.Stop() 182 + 183 + query.LatestPlan, err = q.CheckQueryPlan(ctx, query.Query) 184 + if err != nil { 185 + log.Errorf("failed to check query plan: %+v\n", err) 186 + } 187 + 188 + if query.LatestPlan != nil { 189 + log.Infof("Initial plan:\n%+v\n", query.LatestPlan.String()) 190 + query.RecordPlanMetrics(*query.LatestPlan) 191 + query.LastChecked = time.Now() 192 + } 193 + 194 + for { 195 + select { 196 + case <-ticker.C: 197 + log.Info("checking query plan") 198 + 199 + query.lk.RLock() 200 + queryString := query.Query 201 + query.lk.RUnlock() 202 + 203 + qp, err := q.CheckQueryPlan(ctx, queryString) 204 + 205 + query.lk.Lock() 206 + query.LastChecked = time.Now() 207 + query.LastError = err 208 + query.lk.Unlock() 209 + 210 + execCounter.WithLabelValues(query.Name).Inc() 211 + 212 + if err != nil || qp == nil { 213 + if qp == nil { 214 + log.Errorf("query plan is nil") 215 + } 216 + log.Errorf("failed to check query plan: %+v\n", err) 217 + errorCounter.WithLabelValues(query.Name).Inc() 218 + continue 219 + } 220 + 221 + query.lk.RLock() 222 + lastPlan := *query.LatestPlan 223 + query.lk.RUnlock() 224 + 225 + query.RecordPlanMetrics(*qp) 226 + 227 + if !qp.HasSameStructureAs(lastPlan) { 228 + sign := "+" 229 + diff := math.Abs(lastPlan.Plan.ActualTotalTime - qp.Plan.ActualTotalTime) 230 + if lastPlan.Plan.ActualTotalTime > qp.Plan.ActualTotalTime { 231 + sign = "-" 232 + } 233 + 234 + log.Infof("query plan has changed (%s%.03fms): \n%+v\n", sign, diff, qp.String()) 235 + 236 + query.lk.Lock() 237 + query.PreviousPlan = query.LatestPlan 238 + query.LatestPlan = qp 239 + query.lk.Unlock() 240 + } 241 + case <-query.in: 242 + log.Info("shutting down query checker routine") 243 + query.out <- struct{}{} 244 + return 245 + } 246 + } 247 + }(qu) 248 + } 249 + 250 + return nil 251 + } 252 + 253 + // Stop stops the query checker routines 254 + func (q *Querychecker) Stop() { 255 + _, span := tracer.Start(context.Background(), "Stop") 256 + defer span.End() 257 + 258 + q.Logger.Info("stopping query checker") 259 + 260 + for _, qu := range q.Queries { 261 + qu.in <- struct{}{} 262 + } 263 + 264 + for _, qu := range q.Queries { 265 + <-qu.out 266 + } 267 + 268 + q.Logger.Info("query checker stopped") 269 + } 270 + 271 + // CheckQueryPlan checks the query plan for a given query 272 + func (q *Querychecker) CheckQueryPlan(ctx context.Context, query string) (*QueryPlan, error) { 273 + ctx, span := tracer.Start(ctx, "CheckQueryPlan") 274 + defer span.End() 275 + 276 + conn, err := pgx.Connect(ctx, q.connectionURL) 277 + if err != nil { 278 + return nil, err 279 + } 280 + defer conn.Close(ctx) 281 + 282 + rows, err := conn.Query(ctx, "EXPLAIN (ANALYZE, COSTS, VERBOSE, BUFFERS, FORMAT JSON) "+query) 283 + if err != nil { 284 + return nil, err 285 + } 286 + 287 + var plan QueryPlan 288 + 289 + for rows.Next() { 290 + var plans QueryPlans 291 + err := rows.Scan(&plans) 292 + if err != nil { 293 + return nil, err 294 + } 295 + for _, p := range plans { 296 + plan = p 297 + } 298 + } 299 + 300 + return &plan, nil 301 + } 302 + 303 + // RecordPlanMetrics records the query plan metrics 304 + func (qu *Query) RecordPlanMetrics(qp QueryPlan) { 305 + execDurationCounter.WithLabelValues(qu.Name).Add(qp.Plan.ActualTotalTime) 306 + blocksHitCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.SharedHitBlocks)) 307 + blocksReadCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.SharedReadBlocks)) 308 + blocksWrittenCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.SharedWrittenBlocks)) 309 + blocksDirtyCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.SharedDirtiedBlocks)) 310 + ioReadTimeCounter.WithLabelValues(qu.Name).Add(qp.Plan.IOReadTime) 311 + ioWriteTimeCounter.WithLabelValues(qu.Name).Add(qp.Plan.IOWriteTime) 312 + tempWrittenBlocksCounter.WithLabelValues(qu.Name).Add(float64(qp.Plan.TempWrittenBlocks)) 313 + 314 + qu.RecordPlanNode(qp.Plan) 315 + } 316 + 317 + // RecordPlanNode records the query plan node metrics 318 + func (qu *Query) RecordPlanNode(p Plan) { 319 + planNodeCounter.WithLabelValues(qu.Name, p.NodeType).Inc() 320 + for _, n := range p.Plans { 321 + qu.RecordPlanNode(n) 322 + } 323 + }
+98
querycheck/handlers.go
··· 1 + package querycheck 2 + 3 + import ( 4 + "time" 5 + 6 + "github.com/labstack/echo/v4" 7 + ) 8 + 9 + type RetQuery struct { 10 + Name string `json:"name"` 11 + Query string `json:"query"` 12 + LatestPlan *QueryPlan `json:"latest_plan"` 13 + PreviousPlan *QueryPlan `json:"previous_plan"` 14 + LastChecked time.Time `json:"last_checked"` 15 + LastError error `json:"last_error"` 16 + CheckEvery string `json:"check_every"` 17 + } 18 + 19 + func (q *Querychecker) HandleGetQueries(c echo.Context) error { 20 + queries := q.GetQueries(c.Request().Context()) 21 + retQueries := []RetQuery{} 22 + for _, query := range queries { 23 + retQueries = append(retQueries, RetQuery{ 24 + Name: query.Name, 25 + Query: query.Query, 26 + LatestPlan: query.LatestPlan, 27 + PreviousPlan: query.PreviousPlan, 28 + LastChecked: query.LastChecked, 29 + LastError: query.LastError, 30 + CheckEvery: query.CheckEvery.String(), 31 + }) 32 + } 33 + 34 + return c.JSON(200, retQueries) 35 + } 36 + 37 + func (q *Querychecker) HandleGetQuery(c echo.Context) error { 38 + query := q.GetQuery(c.Request().Context(), c.Param("name")) 39 + if query == nil { 40 + return c.JSON(404, echo.Map{ 41 + "message": "not found", 42 + }) 43 + } 44 + 45 + retQuery := RetQuery{ 46 + Name: query.Name, 47 + Query: query.Query, 48 + LatestPlan: query.LatestPlan, 49 + PreviousPlan: query.PreviousPlan, 50 + LastChecked: query.LastChecked, 51 + LastError: query.LastError, 52 + CheckEvery: query.CheckEvery.String(), 53 + } 54 + 55 + return c.JSON(200, retQuery) 56 + } 57 + 58 + func (q *Querychecker) HandleDeleteQuery(c echo.Context) error { 59 + q.RemoveQuery(c.Request().Context(), c.Param("name")) 60 + return c.JSON(200, echo.Map{ 61 + "message": "success", 62 + }) 63 + } 64 + 65 + type AddQueryRequest struct { 66 + Name string `json:"name"` 67 + Query string `json:"query"` 68 + CheckEvery int64 `json:"check_every_ms"` 69 + } 70 + 71 + func (q *Querychecker) HandleAddQuery(c echo.Context) error { 72 + var req AddQueryRequest 73 + c.Bind(&req) 74 + q.AddQuery(c.Request().Context(), req.Name, req.Query, time.Duration(req.CheckEvery)*time.Millisecond) 75 + return c.JSON(200, echo.Map{ 76 + "name": req.Name, 77 + "query": req.Query, 78 + "check_every_ms": req.CheckEvery, 79 + "message": "success", 80 + }) 81 + } 82 + 83 + type UpdateQueryRequest struct { 84 + Query string `json:"query"` 85 + CheckEvery int64 `json:"check_every_ms"` 86 + } 87 + 88 + func (q *Querychecker) HandleUpdateQuery(c echo.Context) error { 89 + var req UpdateQueryRequest 90 + c.Bind(&req) 91 + q.UpdateQuery(c.Request().Context(), c.Param("name"), req.Query, time.Duration(req.CheckEvery)*time.Millisecond) 92 + return c.JSON(200, echo.Map{ 93 + "name": c.Param("name"), 94 + "query": req.Query, 95 + "check_every_ms": req.CheckEvery, 96 + "message": "success", 97 + }) 98 + }
+61
querycheck/metrics.go
··· 1 + package querycheck 2 + 3 + import ( 4 + "github.com/prometheus/client_golang/prometheus" 5 + "github.com/prometheus/client_golang/prometheus/promauto" 6 + ) 7 + 8 + var execCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 9 + Name: "querycheck_exec_total", 10 + Help: "total number of executions since starting the querycheck", 11 + }, []string{"query"}) 12 + 13 + var execDurationCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 14 + Name: "querycheck_exec_duration_ms_total", 15 + Help: "total ms spent executing the query since starting the querycheck", 16 + }, []string{"query"}) 17 + 18 + var errorCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 19 + Name: "querycheck_errors_total", 20 + Help: "number of errors encountered since starting the querycheck", 21 + }, []string{"query"}) 22 + 23 + var blocksHitCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 24 + Name: "querycheck_blocks_hit_total", 25 + Help: "blocks hit total since starting the querycheck", 26 + }, []string{"query"}) 27 + 28 + var blocksReadCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 29 + Name: "querycheck_blocks_read_total", 30 + Help: "blocks read total since starting the querycheck", 31 + }, []string{"query"}) 32 + 33 + var blocksWrittenCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 34 + Name: "querycheck_blocks_written_total", 35 + Help: "blocks written total since starting the querycheck", 36 + }, []string{"query"}) 37 + 38 + var blocksDirtyCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 39 + Name: "querycheck_blocks_dirty_total", 40 + Help: "blocks dirty total since starting the querycheck", 41 + }, []string{"query"}) 42 + 43 + var ioReadTimeCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 44 + Name: "querycheck_io_read_ms_total", 45 + Help: "io read time (in ms) total since starting the querycheck", 46 + }, []string{"query"}) 47 + 48 + var ioWriteTimeCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 49 + Name: "querycheck_io_write_ms_total", 50 + Help: "io write time (in ms) total since starting the querycheck", 51 + }, []string{"query"}) 52 + 53 + var tempWrittenBlocksCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 54 + Name: "querycheck_temp_written_blocks_total", 55 + Help: "temp written blocks total since starting the querycheck", 56 + }, []string{"query"}) 57 + 58 + var planNodeCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 59 + Name: "querycheck_plan_node_count_total", 60 + Help: "plan node count total since starting the querycheck", 61 + }, []string{"query", "node_type"})
+166
querycheck/plan.go
··· 1 + package querycheck 2 + 3 + import "fmt" 4 + 5 + type Plan struct { 6 + NodeType string `json:"Node Type"` 7 + ParallelAware bool `json:"Parallel Aware"` 8 + AsyncCapable bool `json:"Async Capable"` 9 + StartupCost float64 `json:"Startup Cost"` 10 + TotalCost float64 `json:"Total Cost"` 11 + PlanRows int `json:"Plan Rows"` 12 + PlanWidth int `json:"Plan Width"` 13 + ActualStartupTime float64 `json:"Actual Startup Time"` 14 + ActualTotalTime float64 `json:"Actual Total Time"` 15 + ActualRows int `json:"Actual Rows"` 16 + ActualLoops int `json:"Actual Loops"` 17 + Output []string `json:"Output"` 18 + SharedHitBlocks int `json:"Shared Hit Blocks"` 19 + SharedReadBlocks int `json:"Shared Read Blocks"` 20 + SharedDirtiedBlocks int `json:"Shared Dirtied Blocks"` 21 + SharedWrittenBlocks int `json:"Shared Written Blocks"` 22 + LocalHitBlocks int `json:"Local Hit Blocks"` 23 + LocalReadBlocks int `json:"Local Read Blocks"` 24 + LocalDirtiedBlocks int `json:"Local Dirtied Blocks"` 25 + LocalWrittenBlocks int `json:"Local Written Blocks"` 26 + TempReadBlocks int `json:"Temp Read Blocks"` 27 + TempWrittenBlocks int `json:"Temp Written Blocks"` 28 + IOReadTime float64 `json:"I/O Read Time"` 29 + IOWriteTime float64 `json:"I/O Write Time"` 30 + Plans []Plan `json:"Plans,omitempty"` 31 + ParentRelationship string `json:"Parent Relationship,omitempty"` 32 + SortKey []string `json:"Sort Key,omitempty"` 33 + SortMethod string `json:"Sort Method,omitempty"` 34 + SortSpaceUsed int `json:"Sort Space Used,omitempty"` 35 + SortSpaceType string `json:"Sort Space Type,omitempty"` 36 + WorkersPlanned int `json:"Workers Planned,omitempty"` 37 + WorkersLaunched int `json:"Workers Launched,omitempty"` 38 + SingleCopy bool `json:"Single Copy,omitempty"` 39 + RelationName string `json:"Relation Name,omitempty"` 40 + Schema string `json:"Schema,omitempty"` 41 + Alias string `json:"Alias,omitempty"` 42 + Filter string `json:"Filter,omitempty"` 43 + RowsRemovedByFilter int `json:"Rows Removed by Filter,omitempty"` 44 + Workers []Worker `json:"Workers,omitempty"` 45 + } 46 + 47 + type Worker struct { 48 + WorkerNumber int `json:"Worker Number"` 49 + ActualStartupTime float64 `json:"Actual Startup Time"` 50 + ActualTotalTime float64 `json:"Actual Total Time"` 51 + ActualRows int `json:"Actual Rows"` 52 + ActualLoops int `json:"Actual Loops"` 53 + JIT JIT `json:"JIT"` 54 + SharedHitBlocks int `json:"Shared Hit Blocks"` 55 + SharedReadBlocks int `json:"Shared Read Blocks"` 56 + SharedDirtiedBlocks int `json:"Shared Dirtied Blocks"` 57 + SharedWrittenBlocks int `json:"Shared Written Blocks"` 58 + LocalHitBlocks int `json:"Local Hit Blocks"` 59 + LocalReadBlocks int `json:"Local Read Blocks"` 60 + LocalDirtiedBlocks int `json:"Local Dirtied Blocks"` 61 + LocalWrittenBlocks int `json:"Local Written Blocks"` 62 + TempReadBlocks int `json:"Temp Read Blocks"` 63 + TempWrittenBlocks int `json:"Temp Written Blocks"` 64 + IOReadTime float64 `json:"I/O Read Time"` 65 + IOWriteTime float64 `json:"I/O Write Time"` 66 + } 67 + 68 + type JIT struct { 69 + Functions int `json:"Functions"` 70 + Options Options `json:"Options"` 71 + Timing Timing `json:"Timing"` 72 + } 73 + 74 + type Options struct { 75 + Inlining bool `json:"Inlining"` 76 + Optimization bool `json:"Optimization"` 77 + Expressions bool `json:"Expressions"` 78 + Deforming bool `json:"Deforming"` 79 + } 80 + 81 + type Timing struct { 82 + Generation float64 `json:"Generation"` 83 + Inlining float64 `json:"Inlining"` 84 + Optimization float64 `json:"Optimization"` 85 + Emission float64 `json:"Emission"` 86 + Total float64 `json:"Total"` 87 + } 88 + 89 + type QueryPlan struct { 90 + Plan Plan `json:"Plan"` 91 + QueryIdentifier int64 `json:"Query Identifier"` 92 + Planning Planning `json:"Planning"` 93 + PlanningTime float64 `json:"Planning Time"` 94 + Triggers []string `json:"Triggers"` 95 + JIT JIT `json:"JIT"` 96 + ExecutionTime float64 `json:"Execution Time"` 97 + } 98 + 99 + type Planning struct { 100 + SharedHitBlocks int `json:"Shared Hit Blocks"` 101 + SharedReadBlocks int `json:"Shared Read Blocks"` 102 + SharedDirtiedBlocks int `json:"Shared Dirtied Blocks"` 103 + SharedWrittenBlocks int `json:"Shared Written Blocks"` 104 + LocalHitBlocks int `json:"Local Hit Blocks"` 105 + LocalReadBlocks int `json:"Local Read Blocks"` 106 + LocalDirtiedBlocks int `json:"Local Dirtied Blocks"` 107 + LocalWrittenBlocks int `json:"Local Written Blocks"` 108 + TempReadBlocks int `json:"Temp Read Blocks"` 109 + TempWrittenBlocks int `json:"Temp Written Blocks"` 110 + IOReadTime float64 `json:"I/O Read Time"` 111 + IOWriteTime float64 `json:"I/O Write Time"` 112 + } 113 + 114 + type QueryPlans []QueryPlan 115 + 116 + func (q *QueryPlan) String() string { 117 + ret := "" 118 + ret += q.Plan.String(1) 119 + return ret 120 + } 121 + 122 + func (p *Plan) String(i int) string { 123 + ret := "" 124 + ret += fmt.Sprintf("(%s) Timing: %fms | IO Read: %fms (H %d R %d D %d W %d) | IO Write: %fms", 125 + p.NodeType, 126 + p.ActualTotalTime, 127 + p.IOReadTime, 128 + p.SharedHitBlocks, 129 + p.SharedReadBlocks, 130 + p.SharedWrittenBlocks, 131 + p.SharedDirtiedBlocks, 132 + p.IOWriteTime, 133 + ) 134 + 135 + for _, plan := range p.Plans { 136 + ret += "\n" 137 + for j := 0; j < i; j++ { 138 + ret += "\t" 139 + } 140 + ret += plan.String(i + 1) 141 + } 142 + 143 + return ret 144 + } 145 + 146 + func (q *QueryPlan) HasSameStructureAs(other QueryPlan) bool { 147 + return q.Plan.HasSameStructureAs(other.Plan) 148 + } 149 + 150 + func (p *Plan) HasSameStructureAs(other Plan) bool { 151 + if p.NodeType != other.NodeType { 152 + return false 153 + } 154 + 155 + if len(p.Plans) != len(other.Plans) { 156 + return false 157 + } 158 + 159 + for i, plan := range p.Plans { 160 + if !plan.HasSameStructureAs(other.Plans[i]) { 161 + return false 162 + } 163 + } 164 + 165 + return true 166 + }
+50
util/tracing/tracing.go
··· 1 + package tracing 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + 7 + "go.opentelemetry.io/otel" 8 + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" 9 + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" 10 + "go.opentelemetry.io/otel/sdk/resource" 11 + sdktrace "go.opentelemetry.io/otel/sdk/trace" 12 + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" 13 + ) 14 + 15 + func InstallExportPipeline(ctx context.Context, serviceName string, sampleRatio float64) (func(context.Context) error, error) { 16 + client := otlptracehttp.NewClient() 17 + exporter, err := otlptrace.New(ctx, client) 18 + if err != nil { 19 + return nil, fmt.Errorf("creating OTLP trace exporter: %w", err) 20 + } 21 + 22 + tracerProvider := newTraceProvider(exporter, serviceName, sampleRatio) 23 + otel.SetTracerProvider(tracerProvider) 24 + 25 + return tracerProvider.Shutdown, nil 26 + } 27 + 28 + func newTraceProvider(exp sdktrace.SpanExporter, serviceName string, sampleRatio float64) *sdktrace.TracerProvider { 29 + // Ensure default SDK resources and the required service name are set. 30 + r, err := resource.Merge( 31 + resource.Default(), 32 + resource.NewWithAttributes( 33 + semconv.SchemaURL, 34 + semconv.ServiceName(serviceName), 35 + ), 36 + ) 37 + 38 + if err != nil { 39 + panic(err) 40 + } 41 + 42 + // initialize the traceIDRatioBasedSampler 43 + traceIDRatioBasedSampler := sdktrace.TraceIDRatioBased(sampleRatio) 44 + 45 + return sdktrace.NewTracerProvider( 46 + sdktrace.WithSampler(traceIDRatioBasedSampler), 47 + sdktrace.WithBatcher(exp), 48 + sdktrace.WithResource(r), 49 + ) 50 + }