this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Updates for performance and reduce crashing at high concurrency

Jaz deb68763 e15d79c6

+140 -24
+20 -16
backfill/next/backfill.go
··· 6 6 "fmt" 7 7 "log/slog" 8 8 "net/http" 9 + "strings" 9 10 "sync" 10 11 "time" 11 12 12 13 "github.com/bluesky-social/indigo/repo" 14 + "github.com/prometheus/client_golang/prometheus" 13 15 14 16 "github.com/ipfs/go-cid" 15 17 "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ··· 127 129 log.Info("creating new PDS backfiller", "pds", pds) 128 130 opts := DefaultPDSBackfillerOptions() 129 131 opts.ParallelBackfills = b.perPDSBackfillConcurrency 132 + opts.ParallelRecordCreates = b.globalRecordCreateConcurrency / 10 130 133 opts.SyncRequestsPerSecond = b.perPDSSyncsPerSecond 131 134 opts.RecordCreateLimiter = b.globalRecordCreationLimiter 132 135 opts.NSIDFilter = b.NSIDFilter 136 + 137 + if strings.HasSuffix(pds, ".host.bsky.network") { 138 + opts.Client.Timeout = 600 * time.Second 139 + } 133 140 134 141 pdsBackfiller := NewPDSBackfiller(pds, pds, b.Store, b.HandleCreateRecord, opts) 135 142 b.pdsBackfillers[pds] = pdsBackfiller ··· 173 180 174 181 recordCreateConcurrency int 175 182 recordCreateLimiter *rate.Limiter 183 + recordsProcessed prometheus.Counter 176 184 177 185 NSIDFilter string 178 186 ··· 198 206 SyncRequestsPerSecond: 2, 199 207 Client: &http.Client{ 200 208 Transport: otelhttp.NewTransport(http.DefaultTransport), 201 - Timeout: 600 * time.Second, 209 + Timeout: 120 * time.Second, 202 210 }, 203 211 } 204 212 } ··· 222 230 HandleCreateRecord: handleCreate, 223 231 backfillConcurrency: opts.ParallelBackfills, 224 232 recordCreateConcurrency: opts.ParallelRecordCreates, 233 + recordsProcessed: backfillRecordsProcessed.WithLabelValues(name), 225 234 NSIDFilter: opts.NSIDFilter, 226 235 syncLimiter: rate.NewLimiter(rate.Limit(opts.SyncRequestsPerSecond), opts.SyncRequestsPerSecond), 227 236 recordCreateLimiter: opts.RecordCreateLimiter, ··· 322 331 nodeCid cid.Cid 323 332 } 324 333 325 - type recordResult struct { 326 - recordPath string 327 - err error 328 - } 329 - 330 334 type FetchRepoError struct { 331 335 StatusCode int 332 336 Status string ··· 370 374 numRecords := 0 371 375 numRoutines := b.recordCreateConcurrency 372 376 recordQueue := make(chan recordQueueItem, numRoutines) 373 - recordResults := make(chan recordResult, numRoutines) 377 + recordErrors := make(chan error, numRoutines) 374 378 375 379 // Producer routine 376 380 go func() { ··· 395 399 for item := range recordQueue { 396 400 blk, err := r.Blockstore().Get(ctx, item.nodeCid) 397 401 if err != nil { 398 - recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to get blocks for record: %w", err)} 402 + recordErrors <- fmt.Errorf("failed to get blocks for record %q: %w", item.recordPath, err) 399 403 continue 400 404 } 401 405 402 406 raw := blk.RawData() 403 407 404 408 if err := b.recordCreateLimiter.Wait(ctx); err != nil { 405 - recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to wait for record create limiter: %w", err)} 409 + recordErrors <- fmt.Errorf("failed to wait for record create limiter %q: %w", item.recordPath, err) 406 410 break 407 411 } 408 412 409 413 err = b.HandleCreateRecord(ctx, repoDID, rev, item.recordPath, &raw, &item.nodeCid) 410 414 if err != nil { 411 - recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)} 415 + recordErrors <- fmt.Errorf("failed to handle create record %q: %w", item.recordPath, err) 412 416 continue 413 417 } 414 418 415 - backfillRecordsProcessed.WithLabelValues(b.Name).Inc() 416 - recordResults <- recordResult{recordPath: item.recordPath, err: err} 419 + b.recordsProcessed.Inc() 420 + recordErrors <- nil 417 421 } 418 422 }() 419 423 } ··· 423 427 // Handle results 424 428 go func() { 425 429 defer resultWG.Done() 426 - for result := range recordResults { 427 - if result.err != nil { 428 - log.Error("Error processing record", "record", result.recordPath, "error", result.err) 430 + for err := range recordErrors { 431 + if err != nil { 432 + log.Error("Error processing record", "error", err) 429 433 } 430 434 } 431 435 }() 432 436 433 437 wg.Wait() 434 - close(recordResults) 438 + close(recordErrors) 435 439 resultWG.Wait() 436 440 437 441 log.Info("backfill complete",
+42 -1
backfill/next/gormstore.go
··· 114 114 return nil 115 115 } 116 116 117 + func (s *Gormstore) loadJobsForPDS(ctx context.Context, pds string, limit int) error { 118 + enqueuedIndexClause := "" 119 + retryableIndexClause := "" 120 + 121 + // If the DB is a SQLite DB, we can use INDEXED BY to speed up the query 122 + if s.db.Dialector.Name() == "sqlite" { 123 + enqueuedIndexClause = "INDEXED BY enqueued_pds_job_idx" 124 + retryableIndexClause = "INDEXED BY retryable_pds_job_idx" 125 + } 126 + 127 + enqueuedSelect := fmt.Sprintf(`SELECT pds, repo FROM gorm_db_jobs %s WHERE pds = ? AND state = 'enqueued' LIMIT ?`, enqueuedIndexClause) 128 + retryableSelect := fmt.Sprintf(`SELECT pds, repo FROM gorm_db_jobs %s WHERE pds = ? AND state like 'failed%%' AND (retry_after = NULL OR retry_after < ?) LIMIT ?`, retryableIndexClause) 129 + 130 + todoJobs := make([]todoJob, 0, limit) 131 + if err := s.db.Raw(enqueuedSelect, pds, limit).Scan(&todoJobs).Error; err != nil { 132 + return err 133 + } 134 + 135 + if len(todoJobs) < limit { 136 + moreTodo := make([]todoJob, 0, limit-len(todoJobs)) 137 + if err := s.db.Raw(retryableSelect, pds, time.Now(), limit-len(todoJobs)).Scan(&moreTodo).Error; err != nil { 138 + return err 139 + } 140 + todoJobs = append(todoJobs, moreTodo...) 141 + } 142 + 143 + for _, job := range todoJobs { 144 + if pdsQueue, ok := s.pdsQueues[job.pds]; ok { 145 + pdsQueue.qlk.Lock() 146 + pdsQueue.taskQueue = append(pdsQueue.taskQueue, job.repo) 147 + pdsQueue.qlk.Unlock() 148 + } else { 149 + s.pdsQueues[job.pds] = &queue{ 150 + taskQueue: []string{job.repo}, 151 + } 152 + } 153 + } 154 + 155 + return nil 156 + } 157 + 117 158 func (s *Gormstore) GetOrCreateJob(ctx context.Context, pds, repo, state string) (Job, error) { 118 159 j, err := s.getJob(ctx, repo) 119 160 if err == nil { ··· 288 329 defer pdsQueue.qlk.Unlock() 289 330 290 331 if len(pdsQueue.taskQueue) == 0 { 291 - if err := s.loadJobs(ctx, 1000); err != nil { 332 + if err := s.loadJobsForPDS(ctx, pds, 1000); err != nil { 292 333 return nil, err 293 334 } 294 335
+78 -7
cmd/linear/main.go
··· 7 7 "log/slog" 8 8 "net/http" 9 9 _ "net/http/pprof" 10 + "net/url" 10 11 "os" 11 12 "os/signal" 13 + "runtime/debug" 12 14 "strings" 13 15 "syscall" 14 16 ··· 79 81 EnvVars: []string{"LINEAR_OUTPUT_FILE"}, 80 82 Value: "data/linear.jsonl", 81 83 }, 84 + &cli.BoolFlag{ 85 + Name: "discover-pds", 86 + Usage: "discover PDSs from the listHosts endpoint on the Relay", 87 + Value: false, 88 + EnvVars: []string{"LINEAR_DISCOVER_PDS"}, 89 + }, 90 + &cli.StringFlag{ 91 + Name: "relay-host", 92 + Usage: "host of the Relay to use for discovery", 93 + Value: "https://relay.pop1.bsky.network", 94 + EnvVars: []string{"LINEAR_RELAY_HOST"}, 95 + }, 82 96 }, 83 97 }, 84 98 } ··· 106 120 logger.Error("failed to set up metrics listener", "err", err) 107 121 } 108 122 }() 123 + 124 + crashout, err := os.OpenFile("crash.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) 125 + if err != nil { 126 + return fmt.Errorf("failed to open crash log file: %w", err) 127 + } 128 + defer crashout.Close() 129 + 130 + debug.SetCrashOutput(crashout, debug.CrashOptions{}) 109 131 110 132 store, db, err := setupBackfillStore(cctx.Context, cctx.String("backfill-sqlite-path")) 111 133 if err != nil { ··· 139 161 140 162 linear.bf = bf 141 163 142 - err = store.LoadJobs(ctx) 143 - if err != nil { 144 - return fmt.Errorf("failed to load jobs from store: %w", err) 145 - } 146 - 147 164 // Walk the PDS list's listRepos endpoints and add jobs to the backfiller 148 165 pdsList := cctx.StringSlice("pds-list") 149 166 167 + if cctx.Bool("discover-pds") { 168 + logger.Info("discovering PDSs from Relay", "relayHost", cctx.String("relay-host")) 169 + xrpcc := xrpc.Client{} 170 + xrpcc.Host = cctx.String("relay-host") 171 + 172 + limiter := rate.NewLimiter(5, 1) 173 + 174 + newPDSList := []string{} 175 + 176 + cursor := "" 177 + for { 178 + if err := limiter.Wait(ctx); err != nil { 179 + logger.Error("failed to wait for rate limiter", "err", err) 180 + break 181 + } 182 + page, err := comatproto.SyncListHosts(ctx, &xrpcc, cursor, 1000) 183 + if err != nil { 184 + return fmt.Errorf("failed to list hosts from Relay: %w", err) 185 + } 186 + for _, host := range page.Hosts { 187 + if host.Hostname != "" && host.Status != nil && *host.Status == "active" { 188 + newPDSList = append(newPDSList, host.Hostname) 189 + } 190 + } 191 + if page.Cursor == nil { 192 + break 193 + } 194 + cursor = *page.Cursor 195 + } 196 + 197 + pdsList = newPDSList 198 + 199 + logger.Info("discovered PDSs", "count", len(newPDSList), "pdsList", newPDSList) 200 + } 201 + 150 202 go func() { 151 203 for _, pds := range pdsList { 204 + // Ensure the PDS host is valid 205 + u, err := url.Parse(fmt.Sprintf("https://%s", pds)) 206 + if err != nil || u.Host == "" { 207 + logger.Error("invalid PDS host", "pds", pds, "err", err) 208 + continue 209 + } 210 + 211 + pds = u.Host 212 + 152 213 go func(pds string) { 153 214 logger.Info("enqueuing PDS for backfill", "pds", pds) 154 215 ··· 161 222 for { 162 223 if err := listLimiter.Wait(ctx); err != nil { 163 224 logger.Error("failed to wait for rate limiter", "pds", pds, "err", err) 164 - continue 225 + break 165 226 } 166 227 167 228 page, err := comatproto.SyncListRepos(ctx, &xrpcc, cursor, 1000) 168 229 if err != nil { 169 230 logger.Error("failed to list repos for PDS", "pds", pds, "err", err) 170 - continue 231 + break 171 232 } 172 233 173 234 for _, repo := range page.Repos { ··· 259 320 return nil, nil, err 260 321 } 261 322 323 + if err := bfdb.Exec("PRAGMA synchronous=normal;").Error; err != nil { 324 + return nil, nil, err 325 + } 326 + 262 327 if err := bfdb.AutoMigrate(&backfill.GormDBJob{}); err != nil { 263 328 return nil, nil, err 264 329 } ··· 267 332 if err := store.LoadJobs(ctx); err != nil { 268 333 return nil, nil, err 269 334 } 335 + 336 + rawDB, err := bfdb.DB() 337 + if err != nil { 338 + return nil, nil, fmt.Errorf("failed to get raw DB from gorm: %w", err) 339 + } 340 + rawDB.SetMaxOpenConns(10) 270 341 271 342 return store, bfdb, nil 272 343 }