this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Performance tuning

Jaz 660d4f45 deb68763

+83 -37
+16 -12
backfill/next/backfill.go
··· 5 5 "errors" 6 6 "fmt" 7 7 "log/slog" 8 + "math" 8 9 "net/http" 9 10 "strings" 10 11 "sync" ··· 115 116 116 117 func (b *Backfiller) EnqueueJob(ctx context.Context, pds, repo string) error { 117 118 log := slog.With("component", "backfiller", "name", b.Name, "pds", pds, "repo", repo) 118 - log.Info("enqueueing backfill job") 119 + log.Debug("enqueueing backfill job") 119 120 120 121 if err := b.Store.EnqueueJob(ctx, pds, repo); err != nil { 121 122 log.Error("failed to enqueue backfill job", "error", err) ··· 128 129 if _, exists := b.pdsBackfillers[pds]; !exists { 129 130 log.Info("creating new PDS backfiller", "pds", pds) 130 131 opts := DefaultPDSBackfillerOptions() 131 - opts.ParallelBackfills = b.perPDSBackfillConcurrency 132 - opts.ParallelRecordCreates = b.globalRecordCreateConcurrency / 10 132 + opts.ParallelRecordCreates = 2 133 133 opts.SyncRequestsPerSecond = b.perPDSSyncsPerSecond 134 134 opts.RecordCreateLimiter = b.globalRecordCreationLimiter 135 135 opts.NSIDFilter = b.NSIDFilter 136 136 137 137 if strings.HasSuffix(pds, ".host.bsky.network") { 138 138 opts.Client.Timeout = 600 * time.Second 139 + opts.ParallelBackfills = b.perPDSBackfillConcurrency 140 + } else { 141 + opts.ParallelBackfills = int(math.Min(2, float64(b.perPDSBackfillConcurrency))) 139 142 } 140 143 141 144 pdsBackfiller := NewPDSBackfiller(pds, pds, b.Store, b.HandleCreateRecord, opts) ··· 143 146 pdsBackfiller.Start() 144 147 } 145 148 backfillJobsEnqueued.WithLabelValues(b.Name).Inc() 146 - log.Info("backfill job enqueued successfully") 149 + log.Debug("backfill job enqueued successfully") 147 150 return nil 148 151 } 149 152 ··· 200 203 func DefaultPDSBackfillerOptions() *PDSBackfillerOptions { 201 204 return &PDSBackfillerOptions{ 202 205 ParallelBackfills: 10, 203 - ParallelRecordCreates: 100, 206 + ParallelRecordCreates: 2, 204 207 RecordCreateLimiter: rate.NewLimiter(rate.Limit(100), 100), 205 208 NSIDFilter: "", 206 209 SyncRequestsPerSecond: 2, ··· 267 270 job, err := b.Store.GetNextEnqueuedJob(ctx, b.Hostname) 268 271 if err != nil { 269 272 log.Error("failed to get next enqueued job", "error", err) 270 - time.Sleep(1 * time.Second) 273 + time.Sleep(5 * time.Second) 271 274 continue 272 275 } else if job == nil { 273 - time.Sleep(1 * time.Second) 276 + time.Sleep(5 * time.Second) 274 277 continue 275 278 } 276 279 jobs <- job ··· 283 286 go func() { 284 287 defer b.wg.Done() 285 288 log := log.With("subcomponent", "worker", "worker_id", i) 289 + defer log.Info("stopping backfill worker") 290 + 286 291 for job := range jobs { 287 292 select { 288 293 case <-b.stop: 289 - log.Info("stopping backfill worker") 290 294 return 291 295 default: 292 296 } 293 297 294 298 log := log.With("job", job.Repo(), "state", job.State()) 295 - log.Info("processing backfill job") 299 + log.Debug("processing backfill job") 296 300 297 301 if err := job.SetState(ctx, StateInProgress); err != nil { 298 302 log.Error("failed to set job state to in_progress", "error", err) ··· 303 307 if err != nil { 304 308 log.Error("failed to backfill repo", "error", err) 305 309 } else { 306 - log.Info("backfill job completed successfully", "new_state", newState) 310 + log.Debug("backfill job completed successfully", "new_state", newState) 307 311 } 308 312 309 313 if err := job.SetState(ctx, newState); err != nil { ··· 359 363 if job.RetryCount() > 0 { 360 364 log = log.With("retry_count", job.RetryCount()) 361 365 } 362 - log.Info(fmt.Sprintf("processing backfill for %s", repoDID)) 366 + log.Debug(fmt.Sprintf("processing backfill for %s", repoDID)) 363 367 364 368 r, err := b.fetchRepo(ctx, repoDID, b.Hostname) 365 369 if err != nil { ··· 438 442 close(recordErrors) 439 443 resultWG.Wait() 440 444 441 - log.Info("backfill complete", 445 + log.Debug("backfill complete", 442 446 "records_backfilled", numRecords, 443 447 "duration", time.Since(start), 444 448 )
+67 -25
cmd/linear/main.go
··· 12 12 "os/signal" 13 13 "runtime/debug" 14 14 "strings" 15 + "sync" 16 + "sync/atomic" 15 17 "syscall" 18 + "time" 16 19 17 20 comatproto "github.com/bluesky-social/indigo/api/atproto" 18 21 "github.com/bluesky-social/indigo/atproto/data" ··· 144 147 logger: logger, 145 148 146 149 out: backfillOutFile, 147 - outChan: make(chan []byte, 100_000), 150 + outChan: make(chan []byte, 1_000_000), 148 151 fileClosed: make(chan struct{}), 149 152 150 153 teardown: make(chan struct{}), 151 154 } 152 155 153 - linear.startWriter() 156 + linear.startWriters() 154 157 155 158 opts := backfill.DefaultBackfillerOptions() 156 159 opts.GlobalRecordCreateConcurrency = 100_000 157 160 opts.PerPDSSyncsPerSecond = 10 158 - opts.PerPDSBackfillConcurrency = 15 161 + opts.PerPDSBackfillConcurrency = 50 159 162 160 163 bf := backfill.NewBackfiller("linear-backfiller-v2", store, linear.handleCreate, opts) 161 164 ··· 232 235 } 233 236 234 237 for _, repo := range page.Repos { 235 - logger.Info("found repo to backfill", "pds", pds, "repo", repo.Did) 238 + logger.Debug("found repo to backfill", "pds", pds, "repo", repo.Did) 239 + 240 + if repo.Active == nil || !*repo.Active { 241 + logger.Debug("skipping inactive repo", "pds", pds, "repo", repo.Did) 242 + continue 243 + } 236 244 237 245 if err := bf.EnqueueJob(ctx, pds, repo.Did); err != nil { 238 246 logger.Error("failed to enqueue job for PDS", "pds", pds, "repo", repo.Did, "err", err) 239 247 } else { 240 - logger.Info("enqueued job for PDS", "pds", pds, "repo", repo.Did) 248 + logger.Debug("enqueued job for PDS", "pds", pds, "repo", repo.Did) 241 249 } 242 250 } 243 251 ··· 302 310 return fmt.Errorf("failed to marshal line to json: %w", err) 303 311 } 304 312 313 + // Append a newline to the JSON line 314 + lineB = append(lineB, '\n') 315 + 305 316 l.outChan <- lineB 306 317 307 318 return nil ··· 320 331 return nil, nil, err 321 332 } 322 333 323 - if err := bfdb.Exec("PRAGMA synchronous=normal;").Error; err != nil { 334 + if err := bfdb.Exec("PRAGMA synchronous=off;").Error; err != nil { 324 335 return nil, nil, err 325 336 } 326 337 ··· 351 362 return parts[0], parts[1], nil 352 363 } 353 364 354 - func (lin *Linear) startWriter() { 365 + func (lin *Linear) startWriters() { 355 366 log := lin.logger.With("source", "writer") 356 - log.Info("starting writer") 357 - newline := []byte("\n") 367 + log.Info("starting writers") 358 368 359 - // Start the writer 369 + // Start some writers to handle output 370 + wg := sync.WaitGroup{} 371 + 372 + recordsProcessed := atomic.Int64{} 373 + 374 + for range 10 { 375 + wg.Add(1) 376 + go func() { 377 + defer wg.Done() 378 + recs := 0 379 + for { 380 + select { 381 + case <-lin.teardown: 382 + log.Info("received shutdown signal, closing writer") 383 + return 384 + case line := <-lin.outChan: 385 + if _, err := lin.out.Write(line); err != nil { 386 + log.Error("failed to write line to output file", "err", err) 387 + } 388 + recs++ 389 + if recs%1000 == 0 { 390 + recordsProcessed.Add(int64(recs)) 391 + recs = 0 392 + } 393 + } 394 + } 395 + }() 396 + } 397 + 360 398 go func() { 399 + ticker := time.NewTicker(10 * time.Second) 400 + defer ticker.Stop() 361 401 for { 362 402 select { 363 403 case <-lin.teardown: 364 - if err := lin.out.Sync(); err != nil { 365 - log.Error("failed to sync output file", "err", err) 366 - } 367 - if err := lin.out.Close(); err != nil { 368 - log.Error("failed to close output file", "err", err) 369 - } 370 - close(lin.fileClosed) 371 - close(lin.outChan) 404 + log.Info("received shutdown signal, stopping metrics ticker") 372 405 return 373 - case line := <-lin.outChan: 374 - if _, err := lin.out.Write(line); err != nil { 375 - log.Error("failed to write line to output file", "err", err) 376 - } 377 - if _, err := lin.out.Write(newline); err != nil { 378 - log.Error("failed to write newline to output file", "err", err) 379 - } 406 + case <-ticker.C: 407 + count := recordsProcessed.Swap(0) 408 + log.Info("processed records", "count", count, "per_second", float64(count)/10.0) 380 409 } 381 410 } 411 + }() 412 + 413 + go func() { 414 + wg.Wait() 415 + if err := lin.out.Sync(); err != nil { 416 + log.Error("failed to sync output file", "err", err) 417 + } 418 + if err := lin.out.Close(); err != nil { 419 + log.Error("failed to close output file", "err", err) 420 + } 421 + log.Info("all writers have finished") 422 + close(lin.fileClosed) 423 + close(lin.outChan) 382 424 }() 383 425 }