this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

More tuning

Jaz 1f363b2a 682eba08

+44 -6
+44 -6
cmd/linear/main.go
··· 185 185 chPassword: cctx.String("clickhouse-password"), 186 186 chDatabase: cctx.String("clickhouse-database"), 187 187 188 - outChan: make(chan *chRow, 1_000_000), // Buffered channel to handle backpressure 188 + outChan: make(chan *chRow, 500_000), 189 189 writingDone: make(chan struct{}), 190 190 191 191 teardown: make(chan struct{}), ··· 194 194 linear.startClickhouseWriter() 195 195 196 196 opts := backfill.DefaultBackfillerOptions() 197 - opts.GlobalRecordCreateConcurrency = 200_000 197 + opts.GlobalRecordCreateConcurrency = 500_000 198 198 opts.PerPDSSyncsPerSecond = 9.5 199 199 opts.PerPDSBackfillConcurrency = 15 200 200 ··· 378 378 return nil, nil, err 379 379 } 380 380 381 + // Extend the busy timeout to 30 seconds 382 + if err := bfdb.Exec("PRAGMA busy_timeout=30000;").Error; err != nil { 383 + return nil, nil, err 384 + } 385 + 381 386 if err := bfdb.AutoMigrate(&backfill.GormDBJob{}); err != nil { 382 387 return nil, nil, err 383 388 } ··· 392 397 return nil, nil, fmt.Errorf("failed to get raw DB from gorm: %w", err) 393 398 } 394 399 rawDB.SetMaxOpenConns(10) 400 + 401 + // Periodically truncate the WAL with a checkpoint 402 + go func() { 403 + ticker := time.NewTicker(5 * time.Minute) 404 + defer ticker.Stop() 405 + for { 406 + select { 407 + case <-ticker.C: 408 + if err := bfdb.Exec("PRAGMA wal_checkpoint(TRUNCATE);").Error; err != nil { 409 + slog.Error("failed to checkpoint WAL", "err", err) 410 + } 411 + case <-ctx.Done(): 412 + return 413 + } 414 + } 415 + }() 395 416 396 417 return store, bfdb, nil 397 418 } ··· 421 442 collection String, 422 443 rkey String, 423 444 cid String, 424 - record JSON, 445 + record String, 425 446 created_at DateTime64(3, 'UTC') 426 447 ) ENGINE = MergeTree 427 448 ORDER BY (collection, did, rkey); ··· 476 497 477 498 insertQueryStr := `INSERT INTO repo_records (did, collection, rkey, cid, record, created_at) VALUES` 478 499 479 - for range 3 { 500 + for range 5 { 480 501 wg.Add(1) 481 502 go func() { 482 503 defer wg.Done() ··· 488 509 collectionCol proto.ColStr 489 510 rkeyCol proto.ColStr 490 511 cidCol proto.ColStr 491 - recordCol proto.ColJSONBytes 512 + recordCol proto.ColBytes 492 513 createdAtCol = new(proto.ColDateTime64).WithPrecision(3) 493 514 ) 494 515 ··· 514 535 } 515 536 defer chClient.Close() 516 537 538 + t := time.NewTicker(30 * time.Second) 539 + defer t.Stop() 540 + 517 541 for { 518 542 select { 519 543 case <-lin.teardown: ··· 529 553 recordCol.Append(row.Record) 530 554 createdAtCol.Append(row.CreatedAt) 531 555 532 - if recs%100_000 == 0 { 556 + if recs >= 400_000 { 557 + if err := chClient.Do(ctx, ch.Query{ 558 + Body: insertQueryStr, 559 + Input: input, 560 + }); err != nil { 561 + log.Error("failed to insert records into ClickHouse", "err", err) 562 + } else { 563 + recordsProcessed.Add(int64(recs)) 564 + log.Info("inserted records into ClickHouse", "count", recs) 565 + } 566 + recs = 0 567 + input.Reset() 568 + } 569 + case <-t.C: 570 + if recs > 0 { 533 571 if err := chClient.Do(ctx, ch.Query{ 534 572 Body: insertQueryStr, 535 573 Input: input,