this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Refactor Supercollider to have Reload and Fire modes for higher burst emission (#253)

This change refactors Supercollider to support two different modes:
Reload and Fire.

In Reload mode, Supercollider generates a specified number of events as
quickly as it can and writes the CBOR to a file. It uses in-memory
SQLite for the repo manager and I've benched it at around 2.2k evt/sec
for generation.

The static file includes creation of the repos and all the repo appends
for lots of Posts. 2 Million events takes around 16 minutes to produce.
We call this a loaded magazine.

In Fire mode, Supercollider will fire the magazine of events at any
client that connects to the subscribe repos endpoint. Since we're
reading from a file, we can go as fast as the socket can handle, though
we also accept a flag to limit emission rate if we want to for any
reason.

This method allows us to get significantly higher rates (tested at >10k
evt/sec) of event emission for benchmarking a repo stream consumer like
the BGS.

You need to reload between firing only if the consumer has durable repo
state, if you blow away the DB on the consumer between runs, you don't
need to reload since we can play the repos over from creation again
without issue.

authored by

Jaz and committed by
GitHub
b16e4382 bf1ab30c

+418 -213
+1
.gitignore
··· 39 39 40 40 # Sonar cursor file 41 41 sonar_cursor.json 42 + out/
+18
Makefile
··· 90 90 .PHONY: sonar-up 91 91 sonar-up: # Runs sonar docker container 92 92 docker compose -f cmd/sonar/docker-compose.yml up --build -d || docker-compose -f cmd/sonar/docker-compose.yml up --build -d 93 + 94 + .PHONY: sc-reload 95 + sc-reload: # Reloads supercollider 96 + go run cmd/supercollider/main.go \ 97 + reload \ 98 + --port 6125 --total-events 2000000 \ 99 + --hostname alpha.supercollider.jazco.io \ 100 + --key-file out/alpha.pem \ 101 + --output-file out/alpha_in.cbor 102 + 103 + .PHONY: sc-fire 104 + sc-fire: # Fires supercollider 105 + go run cmd/supercollider/main.go \ 106 + fire \ 107 + --port 6125 --events-per-second 10000 \ 108 + --hostname alpha.supercollider.jazco.io \ 109 + --key-file out/alpha.pem \ 110 + --input-file out/alpha_in.cbor
+28
cmd/supercollider/Dockerfile
··· 1 + # Stage 1: Build the Go binary 2 + FROM golang:1.20.5 AS builder 3 + 4 + # Create a directory for the application 5 + WORKDIR /app 6 + 7 + # Fetch dependencies 8 + COPY go.mod go.sum ./ 9 + RUN go mod download 10 + 11 + COPY . . 12 + 13 + # Build the application 14 + RUN CGO_ENABLED=1 GOOS=linux go build -o /supercollider ./cmd/supercollider 15 + 16 + FROM alpine:latest as certs 17 + 18 + RUN apk --update add ca-certificates 19 + 20 + FROM debian:stable-slim 21 + 22 + COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt 23 + 24 + # Copy the binary from the first stage. 25 + COPY --from=builder /supercollider /supercollider 26 + 27 + # Set the startup command to run the binary 28 + CMD ["/supercollider"]
+358 -206
cmd/supercollider/main.go
··· 8 8 "os" 9 9 "os/signal" 10 10 "path/filepath" 11 - "strconv" 12 11 "strings" 13 12 "sync" 14 13 "syscall" ··· 28 27 "github.com/bluesky-social/indigo/plc" 29 28 "github.com/bluesky-social/indigo/util/version" 30 29 petname "github.com/dustinkirkland/golang-petname" 30 + "github.com/icrowley/fake" 31 31 "github.com/labstack/echo-contrib/pprof" 32 32 "github.com/urfave/cli/v2" 33 33 godid "github.com/whyrusleeping/go-did" ··· 43 43 "github.com/prometheus/client_golang/prometheus" 44 44 "github.com/prometheus/client_golang/prometheus/promauto" 45 45 "github.com/prometheus/client_golang/prometheus/promhttp" 46 - "gorm.io/driver/postgres" 47 46 "gorm.io/driver/sqlite" 48 47 "gorm.io/gorm" 48 + 49 + cbg "github.com/whyrusleeping/cbor-gen" 49 50 50 51 "go.uber.org/zap" 51 52 ) ··· 73 74 // Event Loop Parameters 74 75 TotalDesiredEvents int 75 76 MaxEventsPerSecond int 77 + 78 + PlaybackFile string 76 79 } 77 80 78 81 func main() { ··· 93 96 Value: "supercollider.jazco.io", 94 97 EnvVars: []string{"SUPERCOLLIDER_HOST"}, 95 98 }, 96 - &cli.StringFlag{ 97 - Name: "postgres-url", 98 - Usage: "postgres connection string for CarDB (if not set, will use sqlite in-memory)", 99 - EnvVars: []string{"SUPERCOLLIDER_POSTGRES_URL"}, 100 - }, 101 99 &cli.BoolFlag{ 102 - Name: "use-ssl", 103 - Usage: "listen on port 443 and use SSL (needs to be run as root and have external DNS setup)", 104 - Value: false, 100 + Name: "use-ssl", 101 + Usage: "listen on port 443 and use SSL (needs to be run as root and have external DNS setup)", 102 + Value: false, 103 + EnvVars: []string{"SUPERCOLLIDER_USE_SSL"}, 105 104 }, 106 105 &cli.IntFlag{ 107 - Name: "port", 108 - Usage: "port for the HTTP(S) server to listen on (defaults to 80 if not using SSL, 443 if using SSL)", 106 + Name: "port", 107 + Usage: "port for the HTTP(S) server to listen on (defaults to 80 if not using SSL, 443 if using SSL)", 108 + EnvVars: []string{"SUPERCOLLIDER_PORT"}, 109 109 }, 110 - &cli.IntFlag{ 111 - Name: "num-users", 112 - Usage: "number of fake users to produce events for", 113 - Value: 100, 110 + 111 + &cli.StringFlag{ 112 + Name: "key-file", 113 + Usage: "file to store the private key used to sign events", 114 + Value: "key.raw", 115 + EnvVars: []string{"KEY_FILE"}, 114 116 }, 115 - &cli.IntFlag{ 116 - Name: "events-per-second", 117 - Usage: "maximum number of events to generate per second", 118 - Value: 300, 117 + } 118 + 119 + app.Commands = []*cli.Command{ 120 + { 121 + Name: "reload", 122 + Usage: "reload events from a file and write them to an output file", 123 + Action: Reload, 124 + Flags: append([]cli.Flag{ 125 + &cli.IntFlag{ 126 + Name: "num-users", 127 + Usage: "number of fake users to produce events for", 128 + Value: 100, 129 + EnvVars: []string{"NUM_USERS"}, 130 + }, 131 + &cli.IntFlag{ 132 + Name: "total-events", 133 + Usage: "total number of events to generate", 134 + Value: 1_000_000, 135 + EnvVars: []string{"TOTAL_EVENTS"}, 136 + }, 137 + &cli.StringFlag{ 138 + Name: "output-file", 139 + Usage: "output file for the generated events", 140 + Value: "events_out.cbor", 141 + EnvVars: []string{"OUTPUT_FILE"}, 142 + }, 143 + }, app.Flags...), 119 144 }, 120 - &cli.IntFlag{ 121 - Name: "total-events-per-loop", 122 - Usage: "total number of events to generate per loop", 123 - Value: 1_000_000, 145 + { 146 + Name: "fire", 147 + Usage: "fire events from a file over a websocket", 148 + Action: Fire, 149 + Flags: append([]cli.Flag{ 150 + &cli.IntFlag{ 151 + Name: "events-per-second", 152 + Usage: "maximum number of events to generate per second", 153 + Value: 300, 154 + EnvVars: []string{"EVENTS_PER_SECOND"}, 155 + }, 156 + &cli.StringFlag{ 157 + Name: "input-file", 158 + Usage: "input file for the generated events (if set, will read events from this file instead of generating them)", 159 + Value: "events_in.cbor", 160 + EnvVars: []string{"INPUT_FILE"}, 161 + }, 162 + }, app.Flags...), 124 163 }, 125 164 } 126 - 127 - app.Action = Supercollider 128 165 129 166 err := app.Run(os.Args) 130 167 if err != nil { ··· 132 169 } 133 170 } 134 171 135 - func Supercollider(cctx *cli.Context) error { 172 + func Reload(cctx *cli.Context) error { 136 173 ctx := cctx.Context 137 174 ctx, cancel := context.WithCancel(ctx) 138 175 defer cancel() ··· 168 205 169 206 log := rawlog.Sugar().With("source", "supercollider_main") 170 207 171 - log.Info("starting supercollider") 208 + log.Info("Starting Supercollider in Reload Mode") 209 + log.Infof("Generating %d total events and writing them to %s", 210 + cctx.Int("total-events"), cctx.String("output-file")) 172 211 173 212 em := events.NewEventManager(events.NewYoloPersister()) 174 213 214 + // Try to read the key from disk 215 + keyBytes, err := os.ReadFile(cctx.String("key-file")) 216 + if err != nil { 217 + log.Warnf("failed to read key from disk, creating new key: %s", err.Error()) 218 + } 219 + 220 + var privkey *godid.PrivKey 221 + if len(keyBytes) == 0 { 222 + privkey, err = godid.GeneratePrivKey(rand.Reader, godid.KeyTypeSecp256k1) 223 + if err != nil { 224 + log.Fatalf("failed to generate privkey: %+v\n", err) 225 + } 226 + rawKey, err := privkey.RawBytes() 227 + if err != nil { 228 + log.Fatalf("failed to serialize privkey: %+v\n", err) 229 + } 230 + err = os.WriteFile(cctx.String("key-file"), rawKey, 0644) 231 + if err != nil { 232 + log.Fatalf("failed to write privkey to disk: %+v\n", err) 233 + } 234 + } else { 235 + privkey, err = godid.PrivKeyFromRawBytes(godid.KeyTypeSecp256k1, keyBytes) 236 + if err != nil { 237 + log.Fatalf("failed to parse privkey from disk: %+v\n", err) 238 + } 239 + } 240 + 175 241 // Configure the repomanager and keypair for our fake accounts 176 - repoman, privkey, err := initSpeedyRepoMan(cctx.String("postgres-url")) 242 + repoman, privkey, err := initSpeedyRepoMan(privkey) 177 243 if err != nil { 178 244 log.Fatalf("failed to init repo manager: %+v\n", err) 179 245 } ··· 201 267 Dids: dids, 202 268 203 269 Events: em, 204 - EventControl: make(chan string), 205 - TotalDesiredEvents: cctx.Int("total-events-per-loop"), 270 + TotalDesiredEvents: cctx.Int("total-events"), 271 + } 272 + 273 + repoman.SetEventHandler(s.HandleRepoEvent) 274 + 275 + // HTTP Server setup and Middleware Plumbing 276 + e := echo.New() 277 + e.AutoTLSManager.Cache = autocert.DirCache("/var/www/.cache") 278 + pprof.Register(e) 279 + e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ 280 + Format: "method=${method}, ip=${remote_ip}, uri=${uri}, status=${status} latency=${latency_human} (ua=${user_agent})\n", 281 + })) 282 + 283 + e.GET("/", func(c echo.Context) error { 284 + return c.HTML(http.StatusOK, `<h1>Supercollider is reloading...</h1>`) 285 + }) 286 + e.GET("/metrics", echo.WrapHandler(promhttp.Handler())) 287 + 288 + port := cctx.Int("port") 289 + if port == 0 { 290 + if cctx.Bool("use-ssl") { 291 + port = 443 292 + } else { 293 + port = 80 294 + } 295 + } 296 + 297 + wg := sync.WaitGroup{} 298 + wg.Add(1) 299 + // Start a loop to subscribe to events and write them to a file 300 + go func() { 301 + defer wg.Done() 302 + outFile := cctx.String("output-file") 303 + f, err := os.OpenFile(outFile, os.O_CREATE|os.O_WRONLY, 0644) 304 + if err != nil { 305 + log.Fatalf("failed to open output file: %+v\n", err) 306 + } 307 + defer f.Close() 308 + since := int64(0) 309 + 310 + evts, cancel, err := s.Events.Subscribe(ctx, "supercollider_file", func(evt *events.XRPCStreamEvent) bool { 311 + return true 312 + }, &since) 313 + if err != nil { 314 + log.Fatalf("failed to subscribe to events: %+v\n", err) 315 + } 316 + defer cancel() 317 + 318 + log.Infof("writing events to %s", outFile) 319 + 320 + header := events.EventHeader{Op: events.EvtKindMessage} 321 + for { 322 + select { 323 + case <-ctx.Done(): 324 + log.Info("shutting down file writer") 325 + err = f.Sync() 326 + if err != nil { 327 + log.Errorf("failed to sync file: %+v\n", err) 328 + } 329 + log.Info("file writer shutdown complete") 330 + return 331 + case evt := <-evts: 332 + if evt.Error != nil { 333 + log.Errorf("error in event stream: %+v\n", evt.Error) 334 + continue 335 + } 336 + var obj lexutil.CBOR 337 + switch { 338 + case evt.Error != nil: 339 + header.Op = events.EvtKindErrorFrame 340 + obj = evt.Error 341 + case evt.RepoCommit != nil: 342 + header.MsgType = "#commit" 343 + obj = evt.RepoCommit 344 + case evt.RepoHandle != nil: 345 + header.MsgType = "#handle" 346 + obj = evt.RepoHandle 347 + case evt.RepoInfo != nil: 348 + header.MsgType = "#info" 349 + obj = evt.RepoInfo 350 + case evt.RepoMigrate != nil: 351 + header.MsgType = "#migrate" 352 + obj = evt.RepoMigrate 353 + case evt.RepoTombstone != nil: 354 + header.MsgType = "#tombstone" 355 + obj = evt.RepoTombstone 356 + default: 357 + log.Errorf("unrecognized event kind") 358 + continue 359 + } 360 + 361 + if err := header.MarshalCBOR(f); err != nil { 362 + log.Errorf("failed to write header: %+v\n", err) 363 + } 364 + 365 + if err := obj.MarshalCBOR(f); err != nil { 366 + log.Errorf("failed to write event: %+v\n", err) 367 + } 368 + } 369 + } 370 + }() 371 + 372 + // Start the event generation loop 373 + go func() { 374 + time.Sleep(time.Second * 5) 375 + s.EventGenerationLoop(ctx, cancel) 376 + }() 377 + 378 + listenAddress := fmt.Sprintf(":%d", port) 379 + go func() { 380 + if cctx.Bool("use-ssl") { 381 + err = e.StartAutoTLS(listenAddress) 382 + } else { 383 + err = e.Start(listenAddress) 384 + } 385 + if err != nil { 386 + log.Errorf("failed to start server: %+v\n", err) 387 + } 388 + }() 389 + <-ctx.Done() 390 + log.Info("shutting down server...") 391 + wg.Wait() 392 + log.Info("server shutdown complete") 393 + return nil 394 + } 395 + 396 + func Fire(cctx *cli.Context) error { 397 + ctx := cctx.Context 398 + ctx, cancel := context.WithCancel(ctx) 399 + defer cancel() 400 + 401 + // Trap SIGINT to trigger a shutdown. 402 + signals := make(chan os.Signal, 1) 403 + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 404 + 405 + go func() { 406 + select { 407 + case <-signals: 408 + cancel() 409 + fmt.Println("shutting down on signal") 410 + // Give the server some time to shutdown gracefully, then exit. 411 + time.Sleep(time.Second * 5) 412 + os.Exit(0) 413 + case <-ctx.Done(): 414 + fmt.Println("shutting down on context done") 415 + } 416 + }() 417 + 418 + rawlog, err := zap.NewDevelopment() 419 + if err != nil { 420 + log.Fatalf("failed to create logger: %+v\n", err) 421 + } 422 + defer func() { 423 + log.Printf("main function teardown\n") 424 + err := rawlog.Sync() 425 + if err != nil { 426 + log.Printf("failed to sync logger on teardown: %+v", err.Error()) 427 + } 428 + }() 429 + 430 + log := rawlog.Sugar().With("source", "supercollider_main") 431 + 432 + log.Info("Starting Supercollider in Fire Mode") 433 + 434 + // Try to read the key from disk 435 + keyBytes, err := os.ReadFile(cctx.String("key-file")) 436 + if err != nil { 437 + log.Warnf("failed to read key from disk, creating new key: %s", err.Error()) 438 + } 439 + 440 + var privkey *godid.PrivKey 441 + if len(keyBytes) == 0 { 442 + privkey, err = godid.GeneratePrivKey(rand.Reader, godid.KeyTypeSecp256k1) 443 + if err != nil { 444 + log.Fatalf("failed to generate privkey: %+v\n", err) 445 + } 446 + rawKey, err := privkey.RawBytes() 447 + if err != nil { 448 + log.Fatalf("failed to serialize privkey: %+v\n", err) 449 + } 450 + err = os.WriteFile(cctx.String("key-file"), rawKey, 0644) 451 + if err != nil { 452 + log.Fatalf("failed to write privkey to disk: %+v\n", err) 453 + } 454 + } else { 455 + privkey, err = godid.PrivKeyFromRawBytes(godid.KeyTypeSecp256k1, keyBytes) 456 + if err != nil { 457 + log.Fatalf("failed to parse privkey from disk: %+v\n", err) 458 + } 459 + } 460 + 461 + vMethod, err := godid.VerificationMethodFromKey(privkey.Public()) 462 + if err != nil { 463 + log.Fatalf("failed to generate verification method: %+v\n", err) 464 + } 465 + 466 + // Instantiate Server 467 + s := &Server{ 468 + Logger: log, 469 + EnableSSL: cctx.Bool("use-ssl"), 470 + Host: cctx.String("hostname"), 471 + MultibaseKey: *vMethod.PublicKeyMultibase, 206 472 MaxEventsPerSecond: cctx.Int("events-per-second"), 473 + PlaybackFile: cctx.String("input-file"), 207 474 } 208 475 209 476 // HTTP Server setup and Middleware Plumbing ··· 238 505 } 239 506 240 507 e.GET("/", func(c echo.Context) error { 241 - return c.HTML(http.StatusOK, `<h1>Welcome to Supercollider!</h1>`) 508 + return c.HTML(http.StatusOK, `<h1>Supercollider is firing...</h1>`) 242 509 }) 243 - 244 - repoman.SetEventHandler(s.HandleRepoEvent) 245 510 246 511 e.GET("/.well-known/did.json", s.HandleWellKnownDid) 247 512 e.GET("/.well-known/atproto-did", s.HandleAtprotoDid) 248 513 e.GET("/xrpc/com.atproto.server.describeServer", s.DescribeServerHandler) 249 514 e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.HandleSubscribeRepos) 250 515 e.GET("/metrics", echo.WrapHandler(promhttp.Handler())) 251 - e.GET("/generate", s.HandleStartGenerating) 252 - e.GET("/stop", s.HandleStopGenerating) 253 516 254 517 port := cctx.Int("port") 255 518 if port == 0 { ··· 260 523 } 261 524 } 262 525 263 - go s.EventGenerationLoop(ctx, cctx.String("postgres-url") != "") 264 - 265 526 listenAddress := fmt.Sprintf(":%d", port) 266 - if cctx.Bool("use-ssl") { 267 - err = e.StartAutoTLS(listenAddress) 268 - } else { 269 - err = e.Start(listenAddress) 270 - } 271 - if err != nil { 272 - log.Errorf("failed to start server: %+v\n", err) 273 - } 527 + go func() { 528 + if cctx.Bool("use-ssl") { 529 + err = e.StartAutoTLS(listenAddress) 530 + } else { 531 + err = e.Start(listenAddress) 532 + } 533 + if err != nil { 534 + log.Errorf("failed to start server: %+v\n", err) 535 + } 536 + }() 537 + <-ctx.Done() 538 + log.Info("shutting down server") 274 539 return nil 275 540 } 276 541 ··· 292 557 return db, nil 293 558 } 294 559 295 - // Configure a Postgres SqliteDB 296 - func setupPostgresDb(p string) (*gorm.DB, error) { 297 - db, err := gorm.Open(postgres.Open(p), &gorm.Config{}) 298 - if err != nil { 299 - return nil, fmt.Errorf("failed to open db: %w", err) 300 - } 301 - 302 - return db, nil 303 - } 304 - 305 560 // Stand up a Repo Manager with a Web DID Resolver 306 - func initSpeedyRepoMan(postgresString string) (*repomgr.RepoManager, *godid.PrivKey, error) { 561 + func initSpeedyRepoMan(key *godid.PrivKey) (*repomgr.RepoManager, *godid.PrivKey, error) { 307 562 dir, err := os.MkdirTemp("", "supercollider") 308 563 if err != nil { 309 564 return nil, nil, err 310 565 } 311 566 312 - var cardb *gorm.DB 313 - if postgresString != "" { 314 - cardb, err = setupPostgresDb(postgresString) 315 - if err != nil { 316 - return nil, nil, err 317 - } 318 - } else { 319 - cardb, err = setupDb("file::memory:?cache=shared") 320 - if err != nil { 321 - return nil, nil, err 322 - } 567 + cardb, err := setupDb("file::memory:?cache=shared") 568 + if err != nil { 569 + return nil, nil, err 323 570 } 324 571 325 572 cspath := filepath.Join(dir, "carstore") ··· 338 585 }) 339 586 340 587 cachedidr := plc.NewCachingDidResolver(mr, time.Minute*5, 1000) 341 - 342 - key, err := godid.GeneratePrivKey(rand.Reader, godid.KeyTypeSecp256k1) 343 - if err != nil { 344 - return nil, nil, err 345 - } 346 588 347 589 kmgr := indexer.NewKeyManager(cachedidr, key) 348 590 ··· 382 624 } 383 625 } 384 626 385 - // Event Generation Loop and Control 386 - 387 627 // EventGenerationLoop is the main loop for generating events 388 - func (s *Server) EventGenerationLoop(ctx context.Context, concurrent bool) { 389 - running := false 390 - totalEmittedEvents := 0 628 + func (s *Server) EventGenerationLoop(ctx context.Context, cancel context.CancelFunc) { 629 + defer cancel() 630 + s.Logger.Infof("starting event generation for %d events", s.TotalDesiredEvents) 391 631 392 - // We want to produce events at a maximum rate to prevent the buffer from overrunning 393 - limiter := rate.NewLimiter(rate.Limit(s.MaxEventsPerSecond), 10) 632 + s.Logger.Infof("initializing %d fake users", len(s.Dids)) 633 + for i, did := range s.Dids { 634 + uid := models.Uid(i + 1) 635 + if err := s.RepoManager.InitNewActor(ctx, uid, strings.TrimPrefix(did, "did:web:"), did, "catdog", "", ""); err != nil { 636 + log.Fatalf("failed to init actor: %+v\n", err) 637 + } 638 + } 394 639 395 - s.Logger.Infof("starting event generation loop with %d desired events and %d evt/s maximum\n", 396 - s.TotalDesiredEvents, s.MaxEventsPerSecond, 397 - ) 640 + s.Logger.Infof("generating %d events", s.TotalDesiredEvents) 398 641 399 - for { 642 + for i := 0; i < s.TotalDesiredEvents; i++ { 643 + text := fake.SentencesN(3) 644 + // Trim to 300 chars 645 + if len(text) > 300 { 646 + text = text[:300] 647 + } 648 + _, _, err := s.RepoManager.CreateRecord(ctx, models.Uid(i%len(s.Dids)+1), "app.bsky.feed.post", &bsky.FeedPost{ 649 + CreatedAt: time.Now().Format(util.ISO8601), 650 + Text: text, 651 + }) 652 + if err != nil { 653 + s.Logger.Errorf("failed to create record: %+v\n", err) 654 + } else { 655 + eventsGeneratedCounter.Inc() 656 + } 400 657 select { 401 658 case <-ctx.Done(): 659 + s.Logger.Infof("shutting down event generation loop on context done") 402 660 return 403 - case cmd := <-s.EventControl: 404 - switch cmd { 405 - case "start": 406 - running = true 407 - case "stop": 408 - running = false 409 - totalEmittedEvents = 0 410 - } 411 661 default: 412 - if !running { 413 - time.Sleep(time.Second) 414 - continue 415 - } 416 - for i, did := range s.Dids { 417 - uid := models.Uid(i + 1) 418 - if err := s.RepoManager.InitNewActor(ctx, uid, strings.TrimPrefix(did, "did:web:"), did, "catdog", "", ""); err != nil { 419 - log.Fatalf("failed to init actor: %+v\n", err) 420 - } 421 - } 422 - 423 - if concurrent { 424 - recordsPerActor := s.TotalDesiredEvents / len(s.Dids) 425 - wg := sync.WaitGroup{} 426 - for i := 0; i < len(s.Dids); i++ { 427 - wg.Add(1) 428 - go func(i int) { 429 - for j := 0; j < recordsPerActor; j++ { 430 - limiter.Wait(ctx) 431 - _, _, err := s.RepoManager.CreateRecord(ctx, models.Uid(i+1), "app.bsky.feed.post", &bsky.FeedPost{ 432 - Text: "cats", 433 - }) 434 - if err != nil { 435 - s.Logger.Errorf("failed to create record: %+v\n", err) 436 - } else { 437 - eventsGeneratedCounter.Inc() 438 - } 439 - select { 440 - case <-ctx.Done(): 441 - return 442 - default: 443 - } 444 - } 445 - }(i) 446 - } 447 - wg.Wait() 448 - } else { 449 - for i := 0; i < s.TotalDesiredEvents; i++ { 450 - limiter.Wait(ctx) 451 - _, _, err := s.RepoManager.CreateRecord(ctx, models.Uid(i%len(s.Dids)+1), "app.bsky.feed.post", &bsky.FeedPost{ 452 - Text: "cats", 453 - }) 454 - if err != nil { 455 - s.Logger.Errorf("failed to create record: %+v\n", err) 456 - } else { 457 - eventsGeneratedCounter.Inc() 458 - } 459 - select { 460 - case <-ctx.Done(): 461 - return 462 - default: 463 - } 464 - } 465 - } 466 - 467 - s.Logger.Infof("emitted %d events, stopping\n", totalEmittedEvents) 468 - s.EventControl <- "stop" 469 - break 470 662 } 471 663 } 472 - } 473 664 474 - // HandleStartGenerating starts the event generation loop 475 - func (s *Server) HandleStartGenerating(ctx echo.Context) error { 476 - s.EventControl <- "start" 477 - return ctx.String(200, "stream started") 478 - } 479 - 480 - // HandleStopGenerating stops the event generation loop 481 - func (s *Server) HandleStopGenerating(ctx echo.Context) error { 482 - s.EventControl <- "stop" 483 - return ctx.String(200, "stream stopped") 665 + s.Logger.Infof("event generation complete, shutting down") 666 + return 484 667 } 485 668 486 669 // ATProto Handlers for DID Web ··· 534 717 if err != nil { 535 718 return err 536 719 } 537 - 538 - var cursor *int64 539 - 540 - if c.QueryParam("cursor") != "" { 541 - cursorFromQuery, err := strconv.ParseInt(c.QueryParam("cursor"), 10, 64) 542 - if err != nil { 543 - return err 544 - } 545 - cursor = &cursorFromQuery 546 - } 720 + defer conn.Close() 547 721 548 722 ctx := c.Request().Context() 549 723 550 - ident := c.Request().RemoteAddr + "-" + c.Request().UserAgent() 724 + limiter := rate.NewLimiter(rate.Limit(s.MaxEventsPerSecond), 10) 551 725 552 - evts, cancel, err := s.Events.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { 553 - return true 554 - }, cursor) 726 + f, err := os.Open(s.PlaybackFile) 555 727 if err != nil { 728 + s.Logger.Errorf("failed to open playback file: %+v\n", err) 556 729 return err 557 730 } 558 - defer cancel() 731 + defer f.Close() 559 732 560 - header := events.EventHeader{Op: events.EvtKindMessage} 561 - for evt := range evts { 733 + header := cbg.Deferred{} 734 + obj := cbg.Deferred{} 735 + for { 562 736 wc, err := conn.NextWriter(websocket.BinaryMessage) 563 737 if err != nil { 564 738 return err 565 739 } 566 740 567 - var obj lexutil.CBOR 741 + limiter.Wait(ctx) 568 742 569 - switch { 570 - case evt.Error != nil: 571 - header.Op = events.EvtKindErrorFrame 572 - obj = evt.Error 573 - case evt.RepoCommit != nil: 574 - header.MsgType = "#commit" 575 - obj = evt.RepoCommit 576 - case evt.RepoHandle != nil: 577 - header.MsgType = "#handle" 578 - obj = evt.RepoHandle 579 - case evt.RepoInfo != nil: 580 - header.MsgType = "#info" 581 - obj = evt.RepoInfo 582 - case evt.RepoMigrate != nil: 583 - header.MsgType = "#migrate" 584 - obj = evt.RepoMigrate 585 - case evt.RepoTombstone != nil: 586 - header.MsgType = "#tombstone" 587 - obj = evt.RepoTombstone 588 - default: 589 - return fmt.Errorf("unrecognized event kind") 743 + if err := header.UnmarshalCBOR(f); err != nil { 744 + return fmt.Errorf("failed to read header: %w", err) 745 + } 746 + if err := obj.UnmarshalCBOR(f); err != nil { 747 + return fmt.Errorf("failed to read event: %w", err) 590 748 } 591 - 592 749 if err := header.MarshalCBOR(wc); err != nil { 593 750 return fmt.Errorf("failed to write header: %w", err) 594 751 } 595 - 596 752 if err := obj.MarshalCBOR(wc); err != nil { 597 753 return fmt.Errorf("failed to write event: %w", err) 598 754 } 599 - 600 755 if err := wc.Close(); err != nil { 601 756 return fmt.Errorf("failed to flush-close our event write: %w", err) 602 757 } 603 - 604 758 eventsSentCounter.Inc() 605 759 } 606 - 607 - return nil 608 760 }
+7 -7
events/metrics.go
··· 6 6 ) 7 7 8 8 var eventsFromStreamCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 9 - Name: "repo_stream_events_received_total", 9 + Name: "indigo_repo_stream_events_received_total", 10 10 Help: "Total number of events received from the stream", 11 11 }, []string{"remote_addr"}) 12 12 13 13 var bytesFromStreamCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 14 - Name: "repo_stream_bytes_total", 14 + Name: "indigo_repo_stream_bytes_total", 15 15 Help: "Total bytes received from the stream", 16 16 }, []string{"remote_addr"}) 17 17 18 18 var workItemsAdded = promauto.NewCounterVec(prometheus.CounterOpts{ 19 - Name: "work_items_added_total", 19 + Name: "indigo_work_items_added_total", 20 20 Help: "Total number of work items added to the consumer pool", 21 21 }, []string{"pool"}) 22 22 23 23 var workItemsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{ 24 - Name: "work_items_processed_total", 24 + Name: "indigo_work_items_processed_total", 25 25 Help: "Total number of work items processed by the consumer pool", 26 26 }, []string{"pool"}) 27 27 28 28 var workItemsActive = promauto.NewCounterVec(prometheus.CounterOpts{ 29 - Name: "work_items_active_total", 29 + Name: "indigo_work_items_active_total", 30 30 Help: "Total number of work items passed into a worker", 31 31 }, []string{"pool"}) 32 32 33 33 var eventsEnqueued = promauto.NewCounterVec(prometheus.CounterOpts{ 34 - Name: "events_enqueued_for_broadcast_total", 34 + Name: "indigo_events_enqueued_for_broadcast_total", 35 35 Help: "Total number of events enqueued to broadcast to subscribers", 36 36 }, []string{"pool"}) 37 37 38 38 var eventsBroadcast = promauto.NewCounterVec(prometheus.CounterOpts{ 39 - Name: "events_broadcast_total", 39 + Name: "indigo_events_broadcast_total", 40 40 Help: "Total number of events broadcast to subscribers", 41 41 }, []string{"pool"})
+2
go.mod
··· 12 12 github.com/gorilla/websocket v1.5.0 13 13 github.com/hashicorp/go-retryablehttp v0.7.2 14 14 github.com/hashicorp/golang-lru v0.5.4 15 + github.com/icrowley/fake v0.0.0-20221112152111-d7b7e2276db2 15 16 github.com/ipfs/go-block-format v0.1.2 16 17 github.com/ipfs/go-bs-sqlite3 v0.0.0-20221122195556-bfcee1be620d 17 18 github.com/ipfs/go-cid v0.4.1 ··· 64 65 github.com/beorn7/perks v1.0.1 // indirect 65 66 github.com/cenkalti/backoff/v4 v4.2.1 // indirect 66 67 github.com/cespare/xxhash/v2 v2.2.0 // indirect 68 + github.com/corpix/uarand v0.0.0-20170723150923-031be390f409 // indirect 67 69 github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect 68 70 github.com/davecgh/go-spew v1.1.1 // indirect 69 71 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
+4
go.sum
··· 87 87 github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= 88 88 github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= 89 89 github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= 90 + github.com/corpix/uarand v0.0.0-20170723150923-031be390f409 h1:9A+mfQmwzZ6KwUXPc8nHxFtKgn9VIvO3gXAOspIcE3s= 91 + github.com/corpix/uarand v0.0.0-20170723150923-031be390f409/go.mod h1:JSm890tOkDN+M1jqN8pUGDKnzJrsVbJwSMHBY4zwz7M= 90 92 github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= 91 93 github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= 92 94 github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= ··· 229 231 github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= 230 232 github.com/huin/goupnp v1.0.3 h1:N8No57ls+MnjlB+JPiCVSOyy/ot7MJTqlo7rn+NYSqQ= 231 233 github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= 234 + github.com/icrowley/fake v0.0.0-20221112152111-d7b7e2276db2 h1:qU3v73XG4QAqCPHA4HOpfC1EfUvtLIDvQK4mNQ0LvgI= 235 + github.com/icrowley/fake v0.0.0-20221112152111-d7b7e2276db2/go.mod h1:dQ6TM/OGAe+cMws81eTe4Btv1dKxfPZ2CX+YaAFAPN4= 232 236 github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= 233 237 github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= 234 238 github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA=