···388388 },
389389 }
390390391391- pool := events.NewConsumerPool(32, 20, rsc.EventHandler)
391391+ pool := events.NewConsumerPool(32, 20, con.RemoteAddr().String(), rsc.EventHandler)
392392 return events.HandleRepoStream(ctx, con, pool)
393393}
394394···399399 return nil
400400}
401401402402+type cursorSnapshot struct {
403403+ id uint
404404+ cursor int64
405405+}
406406+402407// flushCursors updates the PDS cursors in the DB for all active subscriptions
403408func (s *Slurper) flushCursors(ctx context.Context) []error {
404409 ctx, span := otel.Tracer("feedmgr").Start(ctx, "flushCursors")
405410 defer span.End()
406411412412+ var cursors []cursorSnapshot
413413+407414 s.lk.Lock()
408408- defer s.lk.Unlock()
415415+ // Iterate over active subs and copy the current cursor
416416+ for _, sub := range s.active {
417417+ sub.lk.RLock()
418418+ cursors = append(cursors, cursorSnapshot{
419419+ id: sub.pds.ID,
420420+ cursor: sub.pds.Cursor,
421421+ })
422422+ sub.lk.RUnlock()
423423+ }
424424+ s.lk.Unlock()
409425410426 errs := []error{}
411427412412- // Iterate over active subs and update the PDS cursor in the DB
413413- for _, sub := range s.active {
414414- sub.lk.RLock()
415415- if err := s.db.WithContext(ctx).Model(models.PDS{}).Where("id = ?", sub.pds.ID).UpdateColumn("cursor", sub.pds.Cursor).Error; err != nil {
428428+ tx := s.db.WithContext(ctx).Begin()
429429+ for _, cursor := range cursors {
430430+ if err := tx.WithContext(ctx).Model(models.PDS{}).Where("id = ?", cursor.id).UpdateColumn("cursor", cursor.cursor).Error; err != nil {
416431 errs = append(errs, err)
417432 }
418418- sub.lk.RUnlock()
433433+ }
434434+ if err := tx.WithContext(ctx).Commit().Error; err != nil {
435435+ errs = append(errs, err)
419436 }
420437421438 return errs
+18-17
carstore/bs.go
···69697070 Root models.DbCID `gorm:"index"`
7171 DataStart int64
7272- Seq int `gorm:"index"`
7272+ Seq int `gorm:"index:idx_car_shards_seq;index:idx_car_shards_usr_seq,priority:2,sort:desc"`
7373 Path string
7474- Usr models.Uid `gorm:"index"`
7474+ Usr models.Uid `gorm:"index:idx_car_shards_usr;index:idx_car_shards_usr_seq,priority:1"`
7575 Rebase bool
7676}
7777···275275276276 var lastShard CarShard
277277 // this is often slow (which is why we're caching it) but could be sped up with an extra index:
278278- // CREATE INDEX idx_car_shards_usr_id ON car_shards (usr, id DESC);
279279- if err := cs.meta.WithContext(ctx).Model(CarShard{}).Limit(1).Order("id desc").Find(&lastShard, "usr = ?", user).Error; err != nil {
278278+ // CREATE INDEX idx_car_shards_usr_id ON car_shards (usr, seq DESC);
279279+ if err := cs.meta.WithContext(ctx).Model(CarShard{}).Limit(1).Order("seq desc").Find(&lastShard, "usr = ?", user).Error; err != nil {
280280 //if err := cs.meta.Model(CarShard{}).Where("user = ?", user).Last(&lastShard).Error; err != nil {
281281 //if err != gorm.ErrRecordNotFound {
282282 return nil, err
···653653654654 // TODO: there should be a way to create the shard and block_refs that
655655 // reference it in the same query, would save a lot of time
656656- if err := ds.cs.meta.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
657657- if err := tx.WithContext(ctx).Create(shard).Error; err != nil {
658658- return fmt.Errorf("failed to create shard in DB tx: %w", err)
659659- }
660660- ds.cs.putLastShardCache(ds.user, shard)
656656+ tx := ds.cs.meta.WithContext(ctx).Begin()
661657662662- for _, ref := range brefs {
663663- ref["shard"] = shard.ID
664664- }
658658+ if err := tx.WithContext(ctx).Create(shard).Error; err != nil {
659659+ return fmt.Errorf("failed to create shard in DB tx: %w", err)
660660+ }
661661+ ds.cs.putLastShardCache(ds.user, shard)
662662+663663+ for _, ref := range brefs {
664664+ ref["shard"] = shard.ID
665665+ }
665666666666- if err := createBlockRefs(ctx, tx, brefs); err != nil {
667667- return fmt.Errorf("failed to create block refs: %w", err)
668668- }
667667+ if err := createBlockRefs(ctx, tx, brefs); err != nil {
668668+ return fmt.Errorf("failed to create block refs: %w", err)
669669+ }
669670670670- return nil
671671- }); err != nil {
671671+ err := tx.WithContext(ctx).Commit().Error
672672+ if err != nil {
672673 return fmt.Errorf("failed to commit shard DB transaction: %w", err)
673674 }
674675
+1-1
cmd/sonar/main.go
···107107108108 wg := sync.WaitGroup{}
109109110110- pool := events.NewConsumerPool(cctx.Int("worker-count"), cctx.Int("max-queue-size"), s.HandleStreamEvent)
110110+ pool := events.NewConsumerPool(cctx.Int("worker-count"), cctx.Int("max-queue-size"), u.Host, s.HandleStreamEvent)
111111112112 // Start a goroutine to manage the cursor file, saving the current cursor every 5 seconds.
113113 go func() {
+76-28
cmd/supercollider/main.go
···1010 "path/filepath"
1111 "strconv"
1212 "strings"
1313+ "sync"
1314 "syscall"
1415 "time"
1516···4243 "github.com/prometheus/client_golang/prometheus"
4344 "github.com/prometheus/client_golang/prometheus/promauto"
4445 "github.com/prometheus/client_golang/prometheus/promhttp"
4646+ "gorm.io/driver/postgres"
4547 "gorm.io/driver/sqlite"
4648 "gorm.io/gorm"
4749···9092 Usage: "hostname of this server (forward *.hostname DNS records to this server)",
9193 Value: "supercollider.jazco.io",
9294 EnvVars: []string{"SUPERCOLLIDER_HOST"},
9595+ },
9696+ &cli.StringFlag{
9797+ Name: "postgres-url",
9898+ Usage: "postgres connection string for CarDB (if not set, will use sqlite in-memory)",
9999+ EnvVars: []string{"SUPERCOLLIDER_POSTGRES_URL"},
93100 },
94101 &cli.BoolFlag{
95102 Name: "use-ssl",
···166173 em := events.NewEventManager(events.NewYoloPersister())
167174168175 // Configure the repomanager and keypair for our fake accounts
169169- repoman, privkey, err := initSpeedyRepoMan()
176176+ repoman, privkey, err := initSpeedyRepoMan(cctx.String("postgres-url"))
170177 if err != nil {
171178 log.Fatalf("failed to init repo manager: %+v\n", err)
172179 }
···253260 }
254261 }
255262256256- go s.EventGenerationLoop(ctx)
263263+ go s.EventGenerationLoop(ctx, cctx.String("postgres-url") != "")
257264258265 listenAddress := fmt.Sprintf(":%d", port)
259266 if cctx.Bool("use-ssl") {
···285292 return db, nil
286293}
287294295295+// Configure a Postgres SqliteDB
296296+func setupPostgresDb(p string) (*gorm.DB, error) {
297297+ db, err := gorm.Open(postgres.Open(p), &gorm.Config{})
298298+ if err != nil {
299299+ return nil, fmt.Errorf("failed to open db: %w", err)
300300+ }
301301+302302+ return db, nil
303303+}
304304+288305// Stand up a Repo Manager with a Web DID Resolver
289289-func initSpeedyRepoMan() (*repomgr.RepoManager, *godid.PrivKey, error) {
306306+func initSpeedyRepoMan(postgresString string) (*repomgr.RepoManager, *godid.PrivKey, error) {
290307 dir, err := os.MkdirTemp("", "supercollider")
291308 if err != nil {
292309 return nil, nil, err
293310 }
294311295295- cardb, err := setupDb("file::memory:?cache=shared")
296296- if err != nil {
297297- return nil, nil, err
312312+ var cardb *gorm.DB
313313+ if postgresString != "" {
314314+ cardb, err = setupPostgresDb(postgresString)
315315+ if err != nil {
316316+ return nil, nil, err
317317+ }
318318+ } else {
319319+ cardb, err = setupDb("file::memory:?cache=shared")
320320+ if err != nil {
321321+ return nil, nil, err
322322+ }
298323 }
299324300325 cspath := filepath.Join(dir, "carstore")
···362387// Event Generation Loop and Control
363388364389// EventGenerationLoop is the main loop for generating events
365365-func (s *Server) EventGenerationLoop(ctx context.Context) {
390390+func (s *Server) EventGenerationLoop(ctx context.Context, concurrent bool) {
366391 running := false
367392 totalEmittedEvents := 0
368393···397422 }
398423 }
399424400400- for i := 0; i < s.TotalDesiredEvents; i++ {
401401- totalEmittedEvents++
402402- if i%40_000 == 0 {
403403- s.Logger.Infof("emitted %d events\n", totalEmittedEvents)
404404- }
405405-406406- // Wait for the limiter to allow us to emit another event
407407- limiter.Wait(ctx)
408408-409409- _, _, err := s.RepoManager.CreateRecord(ctx, models.Uid(i%len(s.Dids)+1), "app.bsky.feed.post", &bsky.FeedPost{
410410- Text: "cats",
411411- })
412412- if err != nil {
413413- s.Logger.Errorf("failed to create record: %+v\n", err)
414414- } else {
415415- eventsGeneratedCounter.Inc()
425425+ if concurrent {
426426+ recordsPerActor := s.TotalDesiredEvents / len(s.Dids)
427427+ wg := sync.WaitGroup{}
428428+ for i := 0; i < len(s.Dids); i++ {
429429+ wg.Add(1)
430430+ go func(i int) {
431431+ for j := 0; j < recordsPerActor; j++ {
432432+ limiter.Wait(ctx)
433433+ _, _, err := s.RepoManager.CreateRecord(ctx, models.Uid(i+1), "app.bsky.feed.post", &bsky.FeedPost{
434434+ Text: "cats",
435435+ })
436436+ if err != nil {
437437+ s.Logger.Errorf("failed to create record: %+v\n", err)
438438+ } else {
439439+ eventsGeneratedCounter.Inc()
440440+ }
441441+ select {
442442+ case <-ctx.Done():
443443+ return
444444+ default:
445445+ }
446446+ }
447447+ }(i)
416448 }
417417- select {
418418- case <-ctx.Done():
419419- return
420420- default:
449449+ wg.Wait()
450450+ } else {
451451+ for i := 0; i < s.TotalDesiredEvents; i++ {
452452+ limiter.Wait(ctx)
453453+ _, _, err := s.RepoManager.CreateRecord(ctx, models.Uid(i%len(s.Dids)+1), "app.bsky.feed.post", &bsky.FeedPost{
454454+ Text: "cats",
455455+ })
456456+ if err != nil {
457457+ s.Logger.Errorf("failed to create record: %+v\n", err)
458458+ } else {
459459+ eventsGeneratedCounter.Inc()
460460+ }
461461+ select {
462462+ case <-ctx.Done():
463463+ return
464464+ default:
465465+ }
421466 }
422467 }
468468+423469 s.Logger.Infof("emitted %d events, stopping\n", totalEmittedEvents)
424470 s.EventControl <- "stop"
425471 break
···503549504550 ctx := c.Request().Context()
505551506506- evts, cancel, err := s.Events.Subscribe(ctx, func(evt *events.XRPCStreamEvent) bool {
552552+ ident := c.Request().RemoteAddr + "-" + c.Request().UserAgent()
553553+554554+ evts, cancel, err := s.Events.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
507555 return true
508556 }, cursor)
509557 if err != nil {
···11+package events
22+33+import (
44+ "github.com/prometheus/client_golang/prometheus"
55+ "github.com/prometheus/client_golang/prometheus/promauto"
66+)
77+88+var eventsFromStreamCounter = promauto.NewCounterVec(prometheus.CounterOpts{
99+ Name: "repo_stream_events_received_total",
1010+ Help: "Total number of events received from the stream",
1111+}, []string{"remote_addr"})
1212+1313+var bytesFromStreamCounter = promauto.NewCounterVec(prometheus.CounterOpts{
1414+ Name: "repo_stream_bytes_total",
1515+ Help: "Total bytes received from the stream",
1616+}, []string{"remote_addr"})
1717+1818+var workItemsAdded = promauto.NewCounterVec(prometheus.CounterOpts{
1919+ Name: "work_items_added_total",
2020+ Help: "Total number of work items added to the consumer pool",
2121+}, []string{"pool"})
2222+2323+var workItemsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{
2424+ Name: "work_items_processed_total",
2525+ Help: "Total number of work items processed by the consumer pool",
2626+}, []string{"pool"})
2727+2828+var workItemsActive = promauto.NewCounterVec(prometheus.CounterOpts{
2929+ Name: "work_items_active_total",
3030+ Help: "Total number of work items passed into a worker",
3131+}, []string{"pool"})
3232+3333+var eventsEnqueued = promauto.NewCounterVec(prometheus.CounterOpts{
3434+ Name: "events_enqueued_for_broadcast_total",
3535+ Help: "Total number of events enqueued to broadcast to subscribers",
3636+}, []string{"pool"})
3737+3838+var eventsBroadcast = promauto.NewCounterVec(prometheus.CounterOpts{
3939+ Name: "events_broadcast_total",
4040+ Help: "Total number of events broadcast to subscribers",
4141+}, []string{"pool"})
+8-1
events/parallel.go
···27272828 lk sync.Mutex
2929 active map[string][]*consumerTask
3030+3131+ ident string
3032}
31333232-func NewConsumerPool(maxC, maxQ int, do func(context.Context, *XRPCStreamEvent) error) *ParallelConsumerPool {
3434+func NewConsumerPool(maxC, maxQ int, ident string, do func(context.Context, *XRPCStreamEvent) error) *ParallelConsumerPool {
3335 p := &ParallelConsumerPool{
3436 maxConcurrency: maxC,
3537 maxQueue: maxQ,
···38403941 feeder: make(chan *consumerTask),
4042 active: make(map[string][]*consumerTask),
4343+4444+ ident: ident,
4145 }
42464347 for i := 0; i < maxC; i++ {
···5357}
54585559func (p *ParallelConsumerPool) AddWork(ctx context.Context, repo string, val *XRPCStreamEvent) error {
6060+ workItemsAdded.WithLabelValues(p.ident).Inc()
5661 t := &consumerTask{
5762 repo: repo,
5863 val: val,
···8085func (p *ParallelConsumerPool) worker() {
8186 for work := range p.feeder {
8287 for work != nil {
8888+ workItemsActive.WithLabelValues(p.ident).Inc()
8389 if err := p.do(context.TODO(), work.val); err != nil {
8490 log.Errorf("event handler failed: %s", err)
8591 }
9292+ workItemsProcessed.WithLabelValues(p.ident).Inc()
86938794 p.lk.Lock()
8895 rem, ok := p.active[work.repo]