this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add pagination to DBPlayback and some metrics (#240)

DB utilization gets really high for some reason if we're not paginating
the playback and have a lot of events to playback. This _might_ help?

authored by

Jaz and committed by
GitHub
7467a49d 19cef9c7

+128 -22
+2
bgs/bgs.go
··· 233 233 Format: "method=${method}, uri=${uri}, status=${status} latency=${latency_human}\n", 234 234 })) 235 235 236 + e.Use(MetricsMiddleware) 237 + 236 238 // React uses a virtual router, so we need to serve the index.html for all 237 239 // routes that aren't otherwise handled or in the /assets directory. 238 240 e.File("/dash", "/public/index.html")
+93
bgs/metrics.go
··· 1 1 package bgs 2 2 3 3 import ( 4 + "errors" 5 + "net/http" 6 + "strconv" 7 + "time" 8 + 9 + "github.com/labstack/echo/v4" 4 10 "github.com/prometheus/client_golang/prometheus" 5 11 "github.com/prometheus/client_golang/prometheus/promauto" 6 12 ) ··· 24 30 Name: "events_sent_counter", 25 31 Help: "The total number of events sent to consumers", 26 32 }, []string{"remote_addr", "user_agent"}) 33 + 34 + var reqSz = promauto.NewHistogramVec(prometheus.HistogramOpts{ 35 + Name: "http_request_size_bytes", 36 + Help: "A histogram of request sizes for requests.", 37 + Buckets: prometheus.ExponentialBuckets(100, 10, 8), 38 + }, []string{"code", "method", "path"}) 39 + 40 + var reqDur = promauto.NewHistogramVec(prometheus.HistogramOpts{ 41 + Name: "http_request_duration_seconds", 42 + Help: "A histogram of latencies for requests.", 43 + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), 44 + }, []string{"code", "method", "path"}) 45 + 46 + var reqCnt = promauto.NewCounterVec(prometheus.CounterOpts{ 47 + Name: "http_requests_total", 48 + Help: "A counter for requests to the wrapped handler.", 49 + }, []string{"code", "method", "path"}) 50 + 51 + var resSz = promauto.NewHistogramVec(prometheus.HistogramOpts{ 52 + Name: "http_response_size_bytes", 53 + Help: "A histogram of response sizes for requests.", 54 + Buckets: prometheus.ExponentialBuckets(100, 10, 8), 55 + }, []string{"code", "method", "path"}) 56 + 57 + // MetricsMiddleware defines handler function for metrics middleware 58 + func MetricsMiddleware(next echo.HandlerFunc) echo.HandlerFunc { 59 + return func(c echo.Context) error { 60 + path := c.Path() 61 + if path == "/metrics" || path == "/_health" { 62 + return next(c) 63 + } 64 + 65 + start := time.Now() 66 + requestSize := computeApproximateRequestSize(c.Request()) 67 + 68 + err := next(c) 69 + 70 + status := c.Response().Status 71 + if err != nil { 72 + var httpError *echo.HTTPError 73 + if errors.As(err, &httpError) { 74 + status = httpError.Code 75 + } 76 + if status == 0 || status == http.StatusOK { 77 + status = http.StatusInternalServerError 78 + } 79 + } 80 + 81 + elapsed := float64(time.Since(start)) / float64(time.Second) 82 + 83 + statusStr := strconv.Itoa(status) 84 + method := c.Request().Method 85 + 86 + responseSize := float64(c.Response().Size) 87 + 88 + reqDur.WithLabelValues(statusStr, method, path).Observe(elapsed) 89 + reqCnt.WithLabelValues(statusStr, method, path).Inc() 90 + reqSz.WithLabelValues(statusStr, method, path).Observe(float64(requestSize)) 91 + resSz.WithLabelValues(statusStr, method, path).Observe(responseSize) 92 + 93 + return err 94 + } 95 + } 96 + 97 + func computeApproximateRequestSize(r *http.Request) int { 98 + s := 0 99 + if r.URL != nil { 100 + s = len(r.URL.Path) 101 + } 102 + 103 + s += len(r.Method) 104 + s += len(r.Proto) 105 + for name, values := range r.Header { 106 + s += len(name) 107 + for _, value := range values { 108 + s += len(value) 109 + } 110 + } 111 + s += len(r.Host) 112 + 113 + // N.B. r.Form and r.MultipartForm are assumed to be included in r.URL. 114 + 115 + if r.ContentLength != -1 { 116 + s += int(r.ContentLength) 117 + } 118 + return s 119 + }
+1 -1
carstore/bs.go
··· 67 67 ID uint `gorm:"primarykey"` 68 68 CreatedAt time.Time 69 69 70 - Root models.DbCID 70 + Root models.DbCID `gorm:"index"` 71 71 DataStart int64 72 72 Seq int `gorm:"index"` 73 73 Path string
+32 -21
events/dbpersist.go
··· 288 288 } 289 289 290 290 func (p *DbPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error { 291 - rows, err := p.db.Model(&RepoEventRecord{}).Where("seq > ?", since).Order("seq asc").Rows() 292 - if err != nil { 293 - return err 294 - } 295 - defer rows.Close() 296 - 297 - // Batch events into groups of 100 and hydrate them in parallel. 298 - // Join the hydrated events back into a single stream in order and pass them to the callback. 291 + pageSize := 1000 299 292 300 - batch := make([]*RepoEventRecord, 0, p.batchOptions.PlaybackBatchSize) 301 - for rows.Next() { 302 - var evt RepoEventRecord 303 - if err := p.db.ScanRows(rows, &evt); err != nil { 304 - // Handle error 293 + for { 294 + rows, err := p.db.Model(&RepoEventRecord{}).Where("seq > ?", since).Order("seq asc").Limit(pageSize).Rows() 295 + if err != nil { 305 296 return err 306 297 } 298 + defer rows.Close() 307 299 308 - batch = append(batch, &evt) 300 + hasRows := false 301 + 302 + batch := make([]*RepoEventRecord, 0, p.batchOptions.PlaybackBatchSize) 303 + for rows.Next() { 304 + hasRows = true 305 + 306 + var evt RepoEventRecord 307 + if err := p.db.ScanRows(rows, &evt); err != nil { 308 + return err 309 + } 310 + 311 + // Advance the since cursor 312 + since = int64(evt.Seq) 313 + 314 + batch = append(batch, &evt) 315 + 316 + if len(batch) >= p.batchOptions.PlaybackBatchSize { 317 + if err := p.hydrateBatch(ctx, batch, cb); err != nil { 318 + return err 319 + } 309 320 310 - if len(batch) >= p.batchOptions.PlaybackBatchSize { 321 + batch = batch[:0] 322 + } 323 + } 324 + 325 + if len(batch) > 0 { 311 326 if err := p.hydrateBatch(ctx, batch, cb); err != nil { 312 327 return err 313 328 } 314 - 315 - batch = batch[:0] 316 329 } 317 - } 318 330 319 - if len(batch) > 0 { 320 - if err := p.hydrateBatch(ctx, batch, cb); err != nil { 321 - return err 331 + if !hasRows { 332 + break 322 333 } 323 334 } 324 335