this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Batch cursor updates for PDSs (#245)

Bumping the cursor in the DB for every event has become a bottleneck on
event throughput.

To ameliorate this, we can batch cursor updates and flush them to the DB
every 10 seconds.


![image](https://github.com/bluesky-social/indigo/assets/1617325/2b37cd64-15d6-4a82-9381-db9a1da0508b)

authored by

Jaz and committed by
GitHub
9108e264 314d14b4

+284 -161
+4
bgs/bgs.go
··· 316 316 return e.StartServer(srv) 317 317 } 318 318 319 + func (bgs *BGS) Shutdown() []error { 320 + return bgs.slurper.Shutdown() 321 + } 322 + 319 323 type HealthStatus struct { 320 324 Status string `json:"status"` 321 325 Message string `json:"msg,omitempty"`
+98 -17
bgs/fedmgr.go
··· 11 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 12 "github.com/bluesky-social/indigo/events" 13 13 "github.com/bluesky-social/indigo/models" 14 + "go.opentelemetry.io/otel" 14 15 15 16 "github.com/gorilla/websocket" 16 17 "gorm.io/gorm" ··· 28 29 active map[string]*activeSub 29 30 30 31 newSubsDisabled bool 32 + 33 + shutdownChan chan bool 34 + shutdownResult chan []error 31 35 32 36 ssl bool 33 37 } 34 38 35 39 type activeSub struct { 36 40 pds *models.PDS 41 + lk sync.RWMutex 37 42 ctx context.Context 38 43 cancel func() 39 44 } ··· 41 46 func NewSlurper(db *gorm.DB, cb IndexCallback, ssl bool) (*Slurper, error) { 42 47 db.AutoMigrate(&SlurpConfig{}) 43 48 s := &Slurper{ 44 - cb: cb, 45 - db: db, 46 - active: make(map[string]*activeSub), 47 - ssl: ssl, 49 + cb: cb, 50 + db: db, 51 + active: make(map[string]*activeSub), 52 + ssl: ssl, 53 + shutdownChan: make(chan bool), 54 + shutdownResult: make(chan []error), 48 55 } 49 56 if err := s.loadConfig(); err != nil { 50 57 return nil, err 51 58 } 52 59 60 + // Start a goroutine to flush cursors to the DB every 30s 61 + go func() { 62 + for { 63 + select { 64 + case <-s.shutdownChan: 65 + log.Info("flushing PDS cursors on shutdown") 66 + ctx := context.Background() 67 + ctx, span := otel.Tracer("feedmgr").Start(ctx, "CursorFlusherShutdown") 68 + defer span.End() 69 + var errs []error 70 + if errs = s.flushCursors(ctx); len(errs) > 0 { 71 + for _, err := range errs { 72 + log.Errorf("failed to flush cursors on shutdown: %s", err) 73 + } 74 + } 75 + log.Info("done flushing PDS cursors on shutdown") 76 + s.shutdownResult <- errs 77 + return 78 + case <-time.After(time.Second * 10): 79 + log.Debug("flushing PDS cursors") 80 + ctx := context.Background() 81 + ctx, span := otel.Tracer("feedmgr").Start(ctx, "CursorFlusher") 82 + defer span.End() 83 + if errs := s.flushCursors(ctx); len(errs) > 0 { 84 + for _, err := range errs { 85 + log.Errorf("failed to flush cursors: %s", err) 86 + } 87 + } 88 + log.Debug("done flushing PDS cursors") 89 + } 90 + } 91 + }() 92 + 53 93 return s, nil 54 94 } 55 95 96 + // Shutdown shuts down the slurper 97 + func (s *Slurper) Shutdown() []error { 98 + s.shutdownChan <- true 99 + log.Info("waiting for slurper shutdown") 100 + errs := <-s.shutdownResult 101 + if len(errs) > 0 { 102 + for _, err := range errs { 103 + log.Errorf("shutdown error: %s", err) 104 + } 105 + } 106 + log.Info("slurper shutdown complete") 107 + return errs 108 + } 109 + 56 110 func (s *Slurper) loadConfig() error { 57 111 var sc SlurpConfig 58 112 if err := s.db.Find(&sc).Error; err != nil { ··· 140 194 } 141 195 142 196 ctx, cancel := context.WithCancel(context.Background()) 143 - s.active[host] = &activeSub{ 197 + sub := activeSub{ 144 198 pds: &peering, 145 199 ctx: ctx, 146 200 cancel: cancel, 147 201 } 202 + s.active[host] = &sub 148 203 149 - go s.subscribeWithRedialer(ctx, &peering) 204 + go s.subscribeWithRedialer(ctx, &peering, &sub) 150 205 151 206 return nil 152 207 } ··· 164 219 pds := pds 165 220 166 221 ctx, cancel := context.WithCancel(context.Background()) 167 - s.active[pds.Host] = &activeSub{ 222 + sub := activeSub{ 168 223 pds: &pds, 169 224 ctx: ctx, 170 225 cancel: cancel, 171 226 } 172 - go s.subscribeWithRedialer(ctx, &pds) 227 + s.active[pds.Host] = &sub 228 + go s.subscribeWithRedialer(ctx, &pds, &sub) 173 229 } 174 230 175 231 return nil 176 232 } 177 233 178 - func (s *Slurper) subscribeWithRedialer(ctx context.Context, host *models.PDS) { 234 + func (s *Slurper) subscribeWithRedialer(ctx context.Context, host *models.PDS, sub *activeSub) { 179 235 defer func() { 180 236 s.lk.Lock() 181 237 defer s.lk.Unlock() ··· 221 277 222 278 log.Info("event subscription response code: ", res.StatusCode) 223 279 224 - if err := s.handleConnection(ctx, host, con, &cursor); err != nil { 280 + if err := s.handleConnection(ctx, host, con, &cursor, sub); err != nil { 225 281 if errors.Is(err, ErrTimeoutShutdown) { 226 282 log.Infof("shutting down pds subscription to %s, no activity after %s", host.Host, EventsTimeout) 227 283 return ··· 247 303 248 304 var EventsTimeout = time.Minute 249 305 250 - func (s *Slurper) handleConnection(ctx context.Context, host *models.PDS, con *websocket.Conn, lastCursor *int64) error { 306 + func (s *Slurper) handleConnection(ctx context.Context, host *models.PDS, con *websocket.Conn, lastCursor *int64, sub *activeSub) error { 251 307 ctx, cancel := context.WithCancel(ctx) 252 308 defer cancel() 253 309 ··· 261 317 } 262 318 *lastCursor = evt.Seq 263 319 264 - if err := s.updateCursor(host, *lastCursor); err != nil { 320 + if err := s.updateCursor(sub, *lastCursor); err != nil { 265 321 return fmt.Errorf("updating cursor: %w", err) 266 322 } 267 323 ··· 276 332 } 277 333 *lastCursor = evt.Seq 278 334 279 - if err := s.updateCursor(host, *lastCursor); err != nil { 335 + if err := s.updateCursor(sub, *lastCursor); err != nil { 280 336 return fmt.Errorf("updating cursor: %w", err) 281 337 } 282 338 ··· 291 347 } 292 348 *lastCursor = evt.Seq 293 349 294 - if err := s.updateCursor(host, *lastCursor); err != nil { 350 + if err := s.updateCursor(sub, *lastCursor); err != nil { 295 351 return fmt.Errorf("updating cursor: %w", err) 296 352 } 297 353 ··· 306 362 } 307 363 *lastCursor = evt.Seq 308 364 309 - if err := s.updateCursor(host, *lastCursor); err != nil { 365 + if err := s.updateCursor(sub, *lastCursor); err != nil { 310 366 return fmt.Errorf("updating cursor: %w", err) 311 367 } 312 368 ··· 336 392 return events.HandleRepoStream(ctx, con, pool) 337 393 } 338 394 339 - func (s *Slurper) updateCursor(host *models.PDS, curs int64) error { 340 - return s.db.Model(models.PDS{}).Where("id = ?", host.ID).UpdateColumn("cursor", curs).Error 395 + func (s *Slurper) updateCursor(sub *activeSub, curs int64) error { 396 + sub.lk.Lock() 397 + defer sub.lk.Unlock() 398 + sub.pds.Cursor = curs 399 + return nil 400 + } 401 + 402 + // flushCursors updates the PDS cursors in the DB for all active subscriptions 403 + func (s *Slurper) flushCursors(ctx context.Context) []error { 404 + ctx, span := otel.Tracer("feedmgr").Start(ctx, "flushCursors") 405 + defer span.End() 406 + 407 + s.lk.Lock() 408 + defer s.lk.Unlock() 409 + 410 + errs := []error{} 411 + 412 + // Iterate over active subs and update the PDS cursor in the DB 413 + for _, sub := range s.active { 414 + sub.lk.RLock() 415 + if err := s.db.WithContext(ctx).Model(models.PDS{}).Where("id = ?", sub.pds.ID).UpdateColumn("cursor", sub.pds.Cursor).Error; err != nil { 416 + errs = append(errs, err) 417 + } 418 + sub.lk.RUnlock() 419 + } 420 + 421 + return errs 341 422 } 342 423 343 424 func (s *Slurper) GetActiveList() []string {
+177 -142
cmd/bigsky/main.go
··· 4 4 "context" 5 5 "fmt" 6 6 "os" 7 + "os/signal" 7 8 "path/filepath" 9 + "syscall" 8 10 "time" 9 11 10 12 "github.com/bluesky-social/indigo/api" ··· 49 51 } 50 52 51 53 func run(args []string) { 52 - 53 54 app := cli.App{ 54 55 Name: "bigsky", 55 56 Usage: "atproto BGS/firehose daemon", ··· 120 121 }, 121 122 } 122 123 123 - app.Action = func(cctx *cli.Context) error { 124 + app.Action = Bigsky 125 + err := app.Run(os.Args) 126 + if err != nil { 127 + log.Fatal(err) 128 + } 129 + } 124 130 125 - if cctx.Bool("jaeger") { 126 - url := "http://localhost:14268/api/traces" 127 - exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) 128 - if err != nil { 129 - return err 130 - } 131 - tp := tracesdk.NewTracerProvider( 132 - // Always be sure to batch in production. 133 - tracesdk.WithBatcher(exp), 134 - // Record information about this application in a Resource. 135 - tracesdk.WithResource(resource.NewWithAttributes( 136 - semconv.SchemaURL, 137 - semconv.ServiceNameKey.String("bgs"), 138 - attribute.String("environment", "test"), 139 - attribute.Int64("ID", 1), 140 - )), 141 - ) 131 + func Bigsky(cctx *cli.Context) error { 132 + // Trap SIGINT to trigger a shutdown. 133 + signals := make(chan os.Signal, 1) 134 + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 142 135 143 - otel.SetTracerProvider(tp) 136 + if cctx.Bool("jaeger") { 137 + url := "http://localhost:14268/api/traces" 138 + exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) 139 + if err != nil { 140 + return err 144 141 } 142 + tp := tracesdk.NewTracerProvider( 143 + // Always be sure to batch in production. 144 + tracesdk.WithBatcher(exp), 145 + // Record information about this application in a Resource. 146 + tracesdk.WithResource(resource.NewWithAttributes( 147 + semconv.SchemaURL, 148 + semconv.ServiceNameKey.String("bgs"), 149 + attribute.String("environment", "test"), 150 + attribute.Int64("ID", 1), 151 + )), 152 + ) 145 153 146 - // Enable OTLP HTTP exporter 147 - // For relevant environment variables: 148 - // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables 149 - // At a minimum, you need to set 150 - // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 151 - if os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") != "" { 152 - ctx, cancel := context.WithCancel(context.Background()) 153 - defer cancel() 154 + otel.SetTracerProvider(tp) 155 + } 154 156 155 - exp, err := otlptracehttp.New(ctx) 156 - if err != nil { 157 - log.Fatalw("failed to create trace exporter", "error", err) 158 - } 159 - defer func() { 160 - ctx, cancel := context.WithTimeout(context.Background(), time.Second) 161 - defer cancel() 162 - if err := exp.Shutdown(ctx); err != nil { 163 - log.Errorw("failed to shutdown trace exporter", "error", err) 164 - } 165 - }() 157 + // Enable OTLP HTTP exporter 158 + // For relevant environment variables: 159 + // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables 160 + // At a minimum, you need to set 161 + // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 162 + if os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") != "" { 163 + ctx, cancel := context.WithCancel(context.Background()) 164 + defer cancel() 166 165 167 - tp := tracesdk.NewTracerProvider( 168 - tracesdk.WithBatcher(exp), 169 - tracesdk.WithResource(resource.NewWithAttributes( 170 - semconv.SchemaURL, 171 - semconv.ServiceNameKey.String("bgs"), 172 - attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog 173 - attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others 174 - attribute.Int64("ID", 1), 175 - )), 176 - ) 177 - otel.SetTracerProvider(tp) 166 + exp, err := otlptracehttp.New(ctx) 167 + if err != nil { 168 + log.Fatalw("failed to create trace exporter", "error", err) 178 169 } 170 + defer func() { 171 + ctx, cancel := context.WithTimeout(context.Background(), time.Second) 172 + defer cancel() 173 + if err := exp.Shutdown(ctx); err != nil { 174 + log.Errorw("failed to shutdown trace exporter", "error", err) 175 + } 176 + }() 179 177 180 - // ensure data directory exists; won't error if it does 181 - datadir := cctx.String("data-dir") 182 - csdir := filepath.Join(datadir, "carstore") 183 - if err := os.MkdirAll(datadir, os.ModePerm); err != nil { 184 - return err 185 - } 178 + tp := tracesdk.NewTracerProvider( 179 + tracesdk.WithBatcher(exp), 180 + tracesdk.WithResource(resource.NewWithAttributes( 181 + semconv.SchemaURL, 182 + semconv.ServiceNameKey.String("bgs"), 183 + attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog 184 + attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others 185 + attribute.Int64("ID", 1), 186 + )), 187 + ) 188 + otel.SetTracerProvider(tp) 189 + } 186 190 187 - dburl := cctx.String("db-url") 188 - db, err := cliutil.SetupDatabase(dburl) 189 - if err != nil { 191 + // ensure data directory exists; won't error if it does 192 + datadir := cctx.String("data-dir") 193 + csdir := filepath.Join(datadir, "carstore") 194 + if err := os.MkdirAll(datadir, os.ModePerm); err != nil { 195 + return err 196 + } 197 + 198 + dburl := cctx.String("db-url") 199 + db, err := cliutil.SetupDatabase(dburl) 200 + if err != nil { 201 + return err 202 + } 203 + 204 + csdburl := cctx.String("carstore-db-url") 205 + csdb, err := cliutil.SetupDatabase(csdburl) 206 + if err != nil { 207 + return err 208 + } 209 + 210 + if cctx.Bool("db-tracing") { 211 + if err := db.Use(tracing.NewPlugin()); err != nil { 190 212 return err 191 213 } 192 - 193 - csdburl := cctx.String("carstore-db-url") 194 - csdb, err := cliutil.SetupDatabase(csdburl) 195 - if err != nil { 214 + if err := csdb.Use(tracing.NewPlugin()); err != nil { 196 215 return err 197 216 } 217 + } 198 218 199 - if cctx.Bool("db-tracing") { 200 - if err := db.Use(tracing.NewPlugin()); err != nil { 201 - return err 202 - } 203 - if err := csdb.Use(tracing.NewPlugin()); err != nil { 204 - return err 205 - } 206 - } 219 + os.MkdirAll(filepath.Dir(csdir), os.ModePerm) 220 + cstore, err := carstore.NewCarStore(csdb, csdir) 221 + if err != nil { 222 + return err 223 + } 207 224 208 - os.MkdirAll(filepath.Dir(csdir), os.ModePerm) 209 - cstore, err := carstore.NewCarStore(csdb, csdir) 210 - if err != nil { 211 - return err 212 - } 225 + mr := did.NewMultiResolver() 213 226 214 - mr := did.NewMultiResolver() 215 - 216 - didr := &api.PLCServer{Host: cctx.String("plc-host")} 217 - mr.AddHandler("plc", didr) 227 + didr := &api.PLCServer{Host: cctx.String("plc-host")} 228 + mr.AddHandler("plc", didr) 218 229 219 - webr := did.WebResolver{} 220 - if cctx.Bool("crawl-insecure-ws") { 221 - webr.Insecure = true 222 - } 223 - mr.AddHandler("web", &webr) 230 + webr := did.WebResolver{} 231 + if cctx.Bool("crawl-insecure-ws") { 232 + webr.Insecure = true 233 + } 234 + mr.AddHandler("web", &webr) 224 235 225 - cachedidr := plc.NewCachingDidResolver(mr, time.Minute*5, 1000) 236 + cachedidr := plc.NewCachingDidResolver(mr, time.Minute*5, 1000) 226 237 227 - kmgr := indexer.NewKeyManager(cachedidr, nil) 238 + kmgr := indexer.NewKeyManager(cachedidr, nil) 228 239 229 - repoman := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cstore, kmgr) 240 + repoman := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cstore, kmgr) 230 241 231 - var persister events.EventPersistence 242 + var persister events.EventPersistence 232 243 233 - if dpd := cctx.String("disk-persister-dir"); dpd != "" { 234 - dp, err := events.NewDiskPersistence(dpd, "", db, events.DefaultDiskPersistOptions()) 235 - if err != nil { 236 - return fmt.Errorf("setting up disk persister: %w", err) 237 - } 238 - persister = dp 239 - } else { 240 - dbp, err := events.NewDbPersistence(db, cstore, nil) 241 - if err != nil { 242 - return fmt.Errorf("setting up db event persistence: %w", err) 243 - } 244 - persister = dbp 244 + if dpd := cctx.String("disk-persister-dir"); dpd != "" { 245 + dp, err := events.NewDiskPersistence(dpd, "", db, events.DefaultDiskPersistOptions()) 246 + if err != nil { 247 + return fmt.Errorf("setting up disk persister: %w", err) 245 248 } 249 + persister = dp 250 + } else { 251 + dbp, err := events.NewDbPersistence(db, cstore, nil) 252 + if err != nil { 253 + return fmt.Errorf("setting up db event persistence: %w", err) 254 + } 255 + persister = dbp 256 + } 246 257 247 - evtman := events.NewEventManager(persister) 258 + evtman := events.NewEventManager(persister) 248 259 249 - notifman := &notifs.NullNotifs{} 260 + notifman := &notifs.NullNotifs{} 250 261 251 - ix, err := indexer.NewIndexer(db, notifman, evtman, cachedidr, repoman, true, cctx.Bool("aggregation")) 252 - if err != nil { 253 - return err 262 + ix, err := indexer.NewIndexer(db, notifman, evtman, cachedidr, repoman, true, cctx.Bool("aggregation")) 263 + if err != nil { 264 + return err 265 + } 266 + 267 + rlskip := os.Getenv("BSKY_SOCIAL_RATE_LIMIT_SKIP") 268 + ix.ApplyPDSClientSettings = func(c *xrpc.Client) { 269 + if c.Host == "https://bsky.social" && rlskip != "" { 270 + c.Headers = map[string]string{ 271 + "x-ratelimit-bypass": rlskip, 272 + } 254 273 } 274 + } 255 275 256 - rlskip := os.Getenv("BSKY_SOCIAL_RATE_LIMIT_SKIP") 257 - ix.ApplyPDSClientSettings = func(c *xrpc.Client) { 258 - if c.Host == "https://bsky.social" && rlskip != "" { 259 - c.Headers = map[string]string{ 260 - "x-ratelimit-bypass": rlskip, 261 - } 262 - } 276 + repoman.SetEventHandler(func(ctx context.Context, evt *repomgr.RepoEvent) { 277 + if err := ix.HandleRepoEvent(ctx, evt); err != nil { 278 + log.Errorw("failed to handle repo event", "err", err) 263 279 } 280 + }) 264 281 265 - repoman.SetEventHandler(func(ctx context.Context, evt *repomgr.RepoEvent) { 266 - if err := ix.HandleRepoEvent(ctx, evt); err != nil { 267 - log.Errorw("failed to handle repo event", "err", err) 268 - } 269 - }) 282 + var blobstore blobs.BlobStore 283 + if bsdir := cctx.String("disk-blob-store"); bsdir != "" { 284 + blobstore = &blobs.DiskBlobStore{bsdir} 285 + } 270 286 271 - var blobstore blobs.BlobStore 272 - if bsdir := cctx.String("disk-blob-store"); bsdir != "" { 273 - blobstore = &blobs.DiskBlobStore{bsdir} 287 + var hr api.HandleResolver = &api.ProdHandleResolver{} 288 + if cctx.StringSlice("handle-resolver-hosts") != nil { 289 + hr = &api.TestHandleResolver{ 290 + TrialHosts: cctx.StringSlice("handle-resolver-hosts"), 274 291 } 292 + } 275 293 276 - var hr api.HandleResolver = &api.ProdHandleResolver{} 277 - if cctx.StringSlice("handle-resolver-hosts") != nil { 278 - hr = &api.TestHandleResolver{ 279 - TrialHosts: cctx.StringSlice("handle-resolver-hosts"), 280 - } 294 + bgs, err := bgs.NewBGS(db, ix, repoman, evtman, cachedidr, blobstore, hr, !cctx.Bool("crawl-insecure-ws")) 295 + if err != nil { 296 + return err 297 + } 298 + 299 + if tok := cctx.String("admin-key"); tok != "" { 300 + if err := bgs.CreateAdminToken(tok); err != nil { 301 + return fmt.Errorf("failed to set up admin token: %w", err) 281 302 } 303 + } 282 304 283 - bgs, err := bgs.NewBGS(db, ix, repoman, evtman, cachedidr, blobstore, hr, !cctx.Bool("crawl-insecure-ws")) 284 - if err != nil { 285 - return err 305 + // set up pprof endpoint 306 + go func() { 307 + if err := bgs.StartDebug(cctx.String("debug-listen")); err != nil { 308 + panic(err) 286 309 } 310 + }() 287 311 288 - if tok := cctx.String("admin-key"); tok != "" { 289 - if err := bgs.CreateAdminToken(tok); err != nil { 290 - return fmt.Errorf("failed to set up admin token: %w", err) 291 - } 292 - } 312 + bgsErr := make(chan error, 1) 293 313 294 - // set up pprof endpoint 295 - go func() { 296 - if err := bgs.StartDebug(cctx.String("debug-listen")); err != nil { 297 - panic(err) 298 - } 299 - }() 314 + go func() { 315 + err := bgs.Start(cctx.String("api-listen")) 316 + bgsErr <- err 317 + }() 300 318 301 - return bgs.Start(cctx.String("api-listen")) 319 + select { 320 + case <-signals: 321 + log.Info("received shutdown signal") 322 + errs := bgs.Shutdown() 323 + for err := range errs { 324 + log.Errorw("error during BGS shutdown", "err", err) 325 + } 326 + case err := <-bgsErr: 327 + if err != nil { 328 + log.Errorw("error during BGS startup", "err", err) 329 + } 330 + log.Info("shutting down") 331 + errs := bgs.Shutdown() 332 + for err := range errs { 333 + log.Errorw("error during BGS shutdown", "err", err) 334 + } 302 335 } 303 336 304 - app.RunAndExitOnError() 337 + log.Info("shutdown complete") 338 + 339 + return nil 305 340 }
+5 -2
repomgr/dbheadstore.go
··· 22 22 } 23 23 24 24 func (hs *DbHeadStore) InitUser(ctx context.Context, user models.Uid, root cid.Cid) error { 25 - if err := hs.db.Create(&RepoHead{ 25 + if err := hs.db.WithContext(ctx).Create(&RepoHead{ 26 26 Usr: user, 27 27 Root: root.String(), 28 28 }).Error; err != nil { ··· 33 33 } 34 34 35 35 func (hs *DbHeadStore) UpdateUserRepoHead(ctx context.Context, user models.Uid, root cid.Cid) error { 36 + ctx, span := otel.Tracer("repoman").Start(ctx, "UpdateUserRepoHead") 37 + defer span.End() 38 + 36 39 if err := hs.db.WithContext(ctx).Clauses(clause.OnConflict{ 37 40 Columns: []clause.Column{{Name: "usr"}}, 38 41 DoUpdates: clause.AssignmentColumns([]string{"root"}), ··· 51 54 defer span.End() 52 55 53 56 var headrec RepoHead 54 - if err := hs.db.Find(&headrec, "usr = ?", user).Error; err != nil { 57 + if err := hs.db.WithContext(ctx).Find(&headrec, "usr = ?", user).Error; err != nil { 55 58 return cid.Undef, err 56 59 } 57 60