this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Update netsync to make it a bit less messy and more compressed (#622)

authored by

Jaz and committed by
GitHub
2c95341c 733547c2

+126 -1390
+1 -5
Makefile
··· 120 120 121 121 .PHONY: run-netsync 122 122 run-netsync: .env ## Runs netsync for local dev 123 - go run ./cmd/netsync --checkout-limit 30 --worker-count 60 --out-dir ../netsync-out 123 + go run ./cmd/netsync --checkout-limit 100 --worker-count 100 --out-dir ../netsync-out 124 124 125 125 SCYLLA_VERSION := latest 126 126 SCYLLA_CPU := 0 127 127 SCYLLA_NODES := 127.0.0.1:9042 128 - 129 - .PHONY: netsync-playback 130 - netsync-playback: .env ## Runs netsync for local dev 131 - go run ./cmd/netsync --worker-count 96 --out-dir ../netsync-out_2023_08_25 playback --scylla-nodes $(SCYLLA_NODES) 132 128 133 129 .PHONY: run-scylla 134 130 run-scylla:
+125 -72
cmd/netsync/main.go
··· 1 1 package main 2 2 3 3 import ( 4 + "archive/tar" 4 5 "bufio" 6 + "compress/gzip" 5 7 "context" 6 8 "encoding/json" 7 9 "fmt" 8 10 "io" 11 + "log" 12 + "log/slog" 9 13 "net/http" 10 14 "os" 11 15 "os/signal" 16 + "path/filepath" 12 17 "strings" 13 18 "sync" 14 19 "syscall" 15 20 "time" 16 21 17 - logging "github.com/ipfs/go-log" 22 + "github.com/bluesky-social/indigo/atproto/data" 23 + "github.com/bluesky-social/indigo/repo" 24 + "github.com/ipfs/go-cid" 18 25 _ "github.com/joho/godotenv/autoload" 19 26 "github.com/prometheus/client_golang/prometheus" 20 27 "github.com/prometheus/client_golang/prometheus/promauto" ··· 25 32 "github.com/carlmjohnson/versioninfo" 26 33 "github.com/urfave/cli/v2" 27 34 ) 28 - 29 - var log = logging.Logger("netsync") 30 35 31 36 func main() { 32 37 app := cli.App{ ··· 69 74 &cli.StringFlag{ 70 75 Name: "checkout-path", 71 76 Usage: "path to checkout endpoint", 72 - Value: "https://bgs.bsky.social/xrpc/com.atproto.sync.getRepo", 77 + Value: "https://bsky.network/xrpc/com.atproto.sync.getRepo", 73 78 }, 74 79 &cli.StringFlag{ 75 80 Name: "magic-header-key", ··· 114 119 return state.Save() 115 120 }, 116 121 }, 117 - { 118 - Name: "playback", 119 - Usage: "playback the contents of a netsync output directory", 120 - Action: Playback, 121 - Flags: []cli.Flag{ 122 - &cli.StringSliceFlag{ 123 - Name: "scylla-nodes", 124 - Usage: "list of scylla nodes to connect to", 125 - EnvVars: []string{"SCYLLA_NODES"}, 126 - }, 127 - }, 128 - }, 129 - { 130 - Name: "query", 131 - Usage: "run a test query against scylla", 132 - Action: Query, 133 - Flags: []cli.Flag{ 134 - &cli.StringSliceFlag{ 135 - Name: "scylla-nodes", 136 - Usage: "list of scylla nodes to connect to", 137 - EnvVars: []string{"SCYLLA_NODES"}, 138 - }, 139 - }, 140 - }, 141 - { 142 - Name: "getPostsForUser", 143 - Usage: "run a test query against scylla", 144 - Action: GetPostsForUser, 145 - Flags: []cli.Flag{ 146 - &cli.StringSliceFlag{ 147 - Name: "scylla-nodes", 148 - Usage: "list of scylla nodes to connect to", 149 - EnvVars: []string{"SCYLLA_NODES"}, 150 - }, 151 - }, 152 - }, 153 122 } 154 123 155 124 app.Action = Netsync ··· 176 145 magicHeaderKey string 177 146 magicHeaderVal string 178 147 148 + logger *slog.Logger 149 + 179 150 lk sync.RWMutex 180 151 wg sync.WaitGroup 181 152 exit chan struct{} ··· 294 265 ctx, cancel := context.WithCancel(ctx) 295 266 defer cancel() 296 267 268 + logLevel := slog.LevelInfo 269 + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: logLevel, AddSource: true})) 270 + slog.SetDefault(slog.New(logger.Handler())) 271 + 297 272 state := &NetsyncState{ 298 273 StatePath: cctx.String("state-file"), 299 274 CheckoutPath: cctx.String("checkout-path"), ··· 309 284 client: &http.Client{ 310 285 Timeout: 180 * time.Second, 311 286 }, 287 + 288 + logger: logger, 312 289 } 313 290 314 291 if state.magicHeaderKey != "" && state.magicHeaderVal != "" { 315 - log.Info("using magic header") 292 + logger.Info("using magic header") 316 293 } 317 294 318 295 // Create out dir ··· 356 333 } 357 334 } 358 335 } else { 359 - log.Info("Resuming from state file") 336 + logger.Info("Resuming from state file") 360 337 } 361 338 362 339 // Start metrics server ··· 372 349 state.wg.Add(1) 373 350 defer state.wg.Done() 374 351 if err := metricsServer.ListenAndServe(); err != http.ErrServerClosed { 375 - log.Fatalf("failed to start metrics server: %+v", err) 352 + logger.Error("failed to start metrics server", "err", err) 353 + os.Exit(1) 376 354 } 377 - log.Info("metrics server shut down successfully") 355 + logger.Info("metrics server shut down successfully") 378 356 }() 379 357 380 358 // Start workers ··· 384 362 defer state.wg.Done() 385 363 err := state.worker(id) 386 364 if err != nil { 387 - log.Errorw("worker failed", "err", err) 365 + logger.Error("worker failed", "err", err) 388 366 } 389 367 }(i) 390 368 } ··· 399 377 case <-ctx.Done(): 400 378 err := state.Save() 401 379 if err != nil { 402 - log.Errorw("failed to save state", "err", err) 380 + logger.Error("failed to save state", "err", err) 403 381 } 404 382 return 405 383 case <-t.C: 406 384 err := state.Save() 407 385 if err != nil { 408 - log.Errorw("failed to save state", "err", err) 386 + logger.Error("failed to save state", "err", err) 409 387 } 410 388 state.lk.RLock() 411 389 if len(state.EnqueuedRepos) == 0 { 412 - log.Info("no more repos to clone, shutting down") 390 + logger.Info("no more repos to clone, shutting down") 413 391 close(state.exit) 414 392 return 415 393 } ··· 419 397 }() 420 398 421 399 // Trap SIGINT to trigger a shutdown. 422 - log.Info("listening for signals") 400 + logger.Info("listening for signals") 423 401 signals := make(chan os.Signal, 1) 424 402 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 425 403 ··· 427 405 case sig := <-signals: 428 406 cancel() 429 407 close(state.exit) 430 - log.Infof("shutting down on signal: %+v", sig) 408 + logger.Info("shutting down on signal", "signal", sig) 431 409 case <-ctx.Done(): 432 410 cancel() 433 411 close(state.exit) 434 - log.Info("shutting down on context done") 412 + logger.Info("shutting down on context done") 435 413 case <-state.exit: 436 414 cancel() 437 - log.Info("shutting down on exit signal") 415 + logger.Info("shutting down on exit signal") 438 416 } 439 417 440 - log.Info("shutting down, waiting for workers to clean up...") 418 + logger.Info("shutting down, waiting for workers to clean up...") 441 419 442 420 if err := metricsServer.Shutdown(ctx); err != nil { 443 - log.Errorf("failed to shut down metrics server: %+v", err) 421 + logger.Error("failed to shut down metrics server", "err", err) 444 422 } 445 423 446 424 state.wg.Wait() 447 425 448 - log.Info("shut down successfully") 426 + logger.Info("shut down successfully") 449 427 450 428 return nil 451 429 452 430 } 453 431 454 432 func (s *NetsyncState) worker(id int) error { 455 - log := log.With("worker", id) 456 - log.Infow("starting worker") 457 - defer log.Infow("worker stopped") 433 + log := s.logger.With("worker", id) 434 + log.Info("starting worker") 435 + defer log.Info("worker stopped") 458 436 for { 459 437 select { 460 438 case <-s.exit: ··· 475 453 // Clone repo 476 454 cloneState, err := s.cloneRepo(ctx, repo) 477 455 if err != nil { 478 - log.Errorw("failed to clone repo", "repo", repo, "err", err) 456 + log.Error("failed to clone repo", "repo", repo, "err", err) 479 457 } 480 458 481 459 // Update state 482 460 s.Finish(repo, cloneState) 483 - log.Infow("worker finished", "repo", repo, "status", cloneState) 461 + log.Info("worker finished", "repo", repo, "status", cloneState) 484 462 } 485 463 } 486 464 } ··· 495 473 Help: "Number of bytes processed", 496 474 }) 497 475 498 - func (s *NetsyncState) cloneRepo(ctx context.Context, repo string) (cloneState string, err error) { 499 - log := log.With("repo", repo, "source", "cloneRepo") 500 - log.Infow("cloning repo") 476 + func (s *NetsyncState) cloneRepo(ctx context.Context, did string) (cloneState string, err error) { 477 + log := s.logger.With("repo", did, "source", "cloneRepo") 478 + log.Info("cloning repo") 501 479 502 480 start := time.Now() 503 481 defer func() { ··· 505 483 repoCloneDuration.WithLabelValues(cloneState).Observe(duration.Seconds()) 506 484 }() 507 485 508 - var url = fmt.Sprintf("%s?did=%s", s.CheckoutPath, repo) 486 + var url = fmt.Sprintf("%s?did=%s", s.CheckoutPath, did) 509 487 510 488 // Clone repo 511 489 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) ··· 538 516 defer instrumentedReader.Close() 539 517 540 518 // Write to file 541 - outPath := fmt.Sprintf("%s/%s", s.outDir, repo) 542 - outFile, err := os.OpenFile(outPath, os.O_CREATE|os.O_WRONLY, 0644) 519 + outPath, err := filepath.Abs(fmt.Sprintf("%s/%s.tar.gz", s.outDir, did)) 520 + if err != nil { 521 + cloneState = "failed (file.abs)" 522 + return cloneState, fmt.Errorf("failed to get absolute path: %w", err) 523 + } 524 + 525 + tarFile, err := os.OpenFile(outPath, os.O_CREATE|os.O_WRONLY, 0644) 543 526 if err != nil { 544 527 cloneState = "failed (file.open)" 545 528 return cloneState, fmt.Errorf("failed to open file: %w", err) 546 529 } 530 + defer tarFile.Close() 547 531 548 - _, err = io.Copy(outFile, instrumentedReader) 532 + gzipWriter := gzip.NewWriter(tarFile) 533 + defer gzipWriter.Close() 534 + 535 + tarWriter := tar.NewWriter(gzipWriter) 536 + defer tarWriter.Close() 537 + 538 + numRecords := 0 539 + collectionsSeen := make(map[string]struct{}) 540 + 541 + r, err := repo.ReadRepoFromCar(ctx, instrumentedReader) 549 542 if err != nil { 550 - cloneState = "failed (file.copy)" 551 - return cloneState, fmt.Errorf("failed to copy file: %w", err) 543 + log.Error("Error reading repo", "err", err) 544 + return "failed (read-repo)", fmt.Errorf("Failed to read repo from CAR: %w", err) 552 545 } 553 546 554 - err = outFile.Close() 547 + err = r.ForEach(ctx, "", func(path string, nodeCid cid.Cid) error { 548 + log := log.With("path", path, "nodeCid", nodeCid) 549 + 550 + recordCid, rec, err := r.GetRecordBytes(ctx, path) 551 + if err != nil { 552 + log.Error("Error getting record", "err", err) 553 + return nil 554 + } 555 + 556 + // Verify that the record CID matches the node CID 557 + if recordCid != nodeCid { 558 + log.Error("Mismatch in record and node CID", "recordCID", recordCid, "nodeCID", nodeCid) 559 + return nil 560 + } 561 + 562 + parts := strings.Split(path, "/") 563 + if len(parts) != 2 { 564 + log.Error("Path does not have 2 parts", "path", path) 565 + return nil 566 + } 567 + 568 + collection := parts[0] 569 + rkey := parts[1] 570 + 571 + numRecords++ 572 + if _, ok := collectionsSeen[collection]; !ok { 573 + collectionsSeen[collection] = struct{}{} 574 + } 575 + 576 + asCbor, err := data.UnmarshalCBOR(*rec) 577 + if err != nil { 578 + log.Error("Error unmarshalling record", "err", err) 579 + return fmt.Errorf("Failed to unmarshal record: %w", err) 580 + } 581 + 582 + recJSON, err := json.Marshal(asCbor) 583 + if err != nil { 584 + log.Error("Error marshalling record to JSON", "err", err) 585 + return fmt.Errorf("Failed to marshal record to JSON: %w", err) 586 + } 587 + 588 + // Write the record directly to the tar.gz file 589 + hdr := &tar.Header{ 590 + Name: fmt.Sprintf("%s/%s.json", collection, rkey), 591 + Mode: 0600, 592 + Size: int64(len(recJSON)), 593 + } 594 + if err := tarWriter.WriteHeader(hdr); err != nil { 595 + log.Error("Error writing tar header", "err", err) 596 + return err 597 + } 598 + if _, err := tarWriter.Write(recJSON); err != nil { 599 + log.Error("Error writing record to tar file", "err", err) 600 + return err 601 + } 602 + 603 + return nil 604 + }) 555 605 if err != nil { 556 - cloneState = "failed (file.close)" 557 - return cloneState, fmt.Errorf("failed to close file: %w", err) 606 + log.Error("Error during ForEach", "err", err) 607 + return "failed (for-each)", fmt.Errorf("Error during ForEach: %w", err) 558 608 } 609 + 610 + log.Info("checkout complete", "numRecords", numRecords, "numCollections", len(collectionsSeen)) 611 + 559 612 cloneState = "success" 560 613 return cloneState, nil 561 614 }
-953
cmd/netsync/playback.go
··· 1 - package main 2 - 3 - import ( 4 - "context" 5 - "encoding/json" 6 - "fmt" 7 - "io/fs" 8 - "net/http" 9 - "os" 10 - "os/signal" 11 - "path/filepath" 12 - "strings" 13 - "sync" 14 - "sync/atomic" 15 - "syscall" 16 - "time" 17 - 18 - "github.com/araddon/dateparse" 19 - "github.com/bluesky-social/indigo/api/bsky" 20 - "github.com/bluesky-social/indigo/repo" 21 - "github.com/gocql/gocql" 22 - "github.com/ipfs/go-cid" 23 - "github.com/prometheus/client_golang/prometheus/promhttp" 24 - "github.com/scylladb/gocqlx/v2" 25 - "github.com/scylladb/gocqlx/v2/qb" 26 - "github.com/scylladb/gocqlx/v2/table" 27 - 28 - "github.com/urfave/cli/v2" 29 - "golang.org/x/text/language" 30 - "golang.org/x/text/message" 31 - ) 32 - 33 - type PlaybackState struct { 34 - EnqueuedRepos map[string]*RepoState 35 - FinishedRepos map[string]*RepoState 36 - 37 - outDir string 38 - 39 - lk sync.RWMutex 40 - wg sync.WaitGroup 41 - exit chan struct{} 42 - workerCount int 43 - 44 - textLen atomic.Uint64 45 - 46 - ses gocqlx.Session 47 - } 48 - 49 - func (s *PlaybackState) Dequeue() string { 50 - s.lk.Lock() 51 - defer s.lk.Unlock() 52 - 53 - enqueuedJobs.Set(float64(len(s.EnqueuedRepos))) 54 - 55 - for repo, state := range s.EnqueuedRepos { 56 - if state.State == "enqueued" { 57 - state.State = "dequeued" 58 - return repo 59 - } 60 - } 61 - 62 - return "" 63 - } 64 - 65 - func (s *PlaybackState) Finish(repo string, state string) { 66 - s.lk.Lock() 67 - defer s.lk.Unlock() 68 - 69 - s.FinishedRepos[repo] = &RepoState{ 70 - Repo: repo, 71 - State: state, 72 - FinishedAt: time.Now(), 73 - } 74 - 75 - finishedJobs.Set(float64(len(s.FinishedRepos))) 76 - 77 - delete(s.EnqueuedRepos, repo) 78 - } 79 - 80 - var postMetadata = table.Metadata{ 81 - Name: "netsync.posts", 82 - Columns: []string{"did", "rkey", "parent_did", "parent_rkey", "content", "embed", "facets", "self_labels", "created_at"}, 83 - PartKey: []string{"did", "rkey"}, 84 - } 85 - var postTable = table.New(postMetadata) 86 - 87 - type Post struct { 88 - Did string 89 - Rkey string 90 - 91 - ParentDid string 92 - ParentRkey string 93 - 94 - Content string 95 - Embed string 96 - Facets string 97 - SelfLabels []string 98 - 99 - CreatedAt time.Time 100 - } 101 - 102 - var postsByDIDMetadata = table.Metadata{ 103 - Name: "netsync.posts_by_did", 104 - Columns: []string{"did", "rkey", "created_at"}, 105 - PartKey: []string{"did"}, 106 - SortKey: []string{"created_at"}, 107 - } 108 - var postsByDIDTable = table.New(postsByDIDMetadata) 109 - 110 - type PostByDID struct { 111 - Did string 112 - Rkey string 113 - CreatedAt time.Time 114 - } 115 - 116 - var repliesMetadata = table.Metadata{ 117 - Name: "netsync.replies", 118 - Columns: []string{"parent_did", "parent_rkey", "child_did", "child_rkey", "created_at"}, 119 - PartKey: []string{"parent_did", "parent_rkey"}, 120 - SortKey: []string{"child_did", "child_rkey"}, 121 - } 122 - var repliesTable = table.New(repliesMetadata) 123 - 124 - type Reply struct { 125 - ParentDid string 126 - ParentRkey string 127 - ChildDid string 128 - ChildRkey string 129 - CreatedAt time.Time 130 - } 131 - 132 - var followsMetadata = table.Metadata{ 133 - Name: "netsync.follows", 134 - Columns: []string{"actor", "rkey", "target", "created_at"}, 135 - PartKey: []string{"actor", "rkey"}, 136 - } 137 - var followsTable = table.New(followsMetadata) 138 - 139 - type Follow struct { 140 - Actor string 141 - Rkey string 142 - Target string 143 - CreatedAt time.Time 144 - } 145 - 146 - var followByActorMetadata = table.Metadata{ 147 - Name: "netsync.follows_by_actor", 148 - Columns: []string{"actor", "target", "created_at"}, 149 - PartKey: []string{"actor", "target"}, 150 - } 151 - var followByActorTable = table.New(followByActorMetadata) 152 - 153 - type FollowByActor struct { 154 - Actor string 155 - Target string 156 - CreatedAt time.Time 157 - } 158 - 159 - var followByTargetMetadata = table.Metadata{ 160 - Name: "netsync.follows_by_target", 161 - Columns: []string{"target", "actor", "created_at"}, 162 - PartKey: []string{"target", "actor"}, 163 - } 164 - var followByTargetTable = table.New(followByTargetMetadata) 165 - 166 - type FollowByTarget struct { 167 - Target string 168 - Actor string 169 - CreatedAt time.Time 170 - } 171 - 172 - var blocksMetadata = table.Metadata{ 173 - Name: "netsync.blocks", 174 - Columns: []string{"actor", "rkey", "target", "created_at"}, 175 - PartKey: []string{"actor", "rkey"}, 176 - } 177 - var blocksTable = table.New(blocksMetadata) 178 - 179 - type Block struct { 180 - Actor string 181 - Rkey string 182 - Target string 183 - CreatedAt time.Time 184 - } 185 - 186 - var blockByActorMetadata = table.Metadata{ 187 - Name: "netsync.blocks_by_actor", 188 - Columns: []string{"actor", "target", "created_at"}, 189 - PartKey: []string{"actor", "target"}, 190 - } 191 - var blockByActorTable = table.New(blockByActorMetadata) 192 - 193 - type BlockByActor struct { 194 - Actor string 195 - Target string 196 - CreatedAt time.Time 197 - } 198 - 199 - var blockByTargetMetadata = table.Metadata{ 200 - Name: "netsync.blocks_by_target", 201 - Columns: []string{"target", "actor", "created_at"}, 202 - PartKey: []string{"target", "actor"}, 203 - } 204 - var blockByTargetTable = table.New(blockByTargetMetadata) 205 - 206 - type BlockByTarget struct { 207 - Target string 208 - Actor string 209 - CreatedAt time.Time 210 - } 211 - 212 - var likesMetadata = table.Metadata{ 213 - Name: "netsync.likes", 214 - Columns: []string{"did", "rkey", "subject", "created_at"}, 215 - PartKey: []string{"did", "rkey"}, 216 - } 217 - var likesTable = table.New(likesMetadata) 218 - 219 - type Like struct { 220 - Did string 221 - Rkey string 222 - Subject string 223 - CreatedAt time.Time 224 - } 225 - 226 - var likeCountMetadata = table.Metadata{ 227 - Name: "netsync.like_counts", 228 - Columns: []string{"did", "nsid", "rkey", "count"}, 229 - PartKey: []string{"did", "nsid", "rkey"}, 230 - } 231 - var likeCountTable = table.New(likeCountMetadata) 232 - 233 - type LikeCount struct { 234 - Did string 235 - Nsid string 236 - Rkey string 237 - Count int64 238 - } 239 - 240 - var repostsMetadata = table.Metadata{ 241 - Name: "netsync.reposts", 242 - Columns: []string{"did", "rkey", "subject", "created_at"}, 243 - PartKey: []string{"did", "rkey"}, 244 - } 245 - var repostsTable = table.New(repostsMetadata) 246 - 247 - type Repost struct { 248 - Did string 249 - Rkey string 250 - Subject string 251 - CreatedAt time.Time 252 - } 253 - 254 - var repostCountMetadata = table.Metadata{ 255 - Name: "netsync.repost_counts", 256 - Columns: []string{"did", "nsid", "rkey", "count"}, 257 - PartKey: []string{"did", "nsid", "rkey"}, 258 - } 259 - var repostCountTable = table.New(repostCountMetadata) 260 - 261 - type RepostCount struct { 262 - Did string 263 - Nsid string 264 - Rkey string 265 - Count int64 266 - } 267 - 268 - func (s *PlaybackState) SetupSchema() error { 269 - if err := s.ses.ExecStmt(`CREATE KEYSPACE IF NOT EXISTS netsync WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };`); err != nil { 270 - return fmt.Errorf("failed to create keyspace: %w", err) 271 - } 272 - 273 - if err := s.ses.ExecStmt(`CREATE TABLE IF NOT EXISTS netsync.posts (did text, rkey text, parent_did text, parent_rkey text, content text, embed text, facets text, self_labels list<text>, created_at timestamp, PRIMARY KEY ((did, rkey)));`); err != nil { 274 - return fmt.Errorf("failed to create posts table: %w", err) 275 - } 276 - 277 - if err := s.ses.ExecStmt(`CREATE TABLE IF NOT EXISTS netsync.posts_by_did (did text, rkey text, created_at timestamp, PRIMARY KEY (did, created_at)) WITH CLUSTERING ORDER BY (created_at DESC);`); err != nil { 278 - return fmt.Errorf("failed to create posts by did table: %w", err) 279 - } 280 - 281 - if err := s.ses.ExecStmt(`CREATE TABLE IF NOT EXISTS netsync.replies (parent_did text, parent_rkey text, child_did text, child_rkey text, created_at timestamp, PRIMARY KEY ((parent_did, parent_rkey), child_did, child_rkey));`); err != nil { 282 - return fmt.Errorf("failed to create replies table: %w", err) 283 - } 284 - 285 - if err := s.ses.ExecStmt(`CREATE TABLE IF NOT EXISTS netsync.follows (actor text, rkey text, target text, created_at timestamp, PRIMARY KEY ((actor, rkey)));`); err != nil { 286 - return fmt.Errorf("failed to create follows table: %w", err) 287 - } 288 - 289 - if err := s.ses.ExecStmt(`CREATE TABLE IF NOT EXISTS netsync.follows_by_actor (actor text, rkey text, target text, created_at timestamp, PRIMARY KEY ((actor, target)));`); err != nil { 290 - return fmt.Errorf("failed to create follows by actor table: %w", err) 291 - } 292 - 293 - if err := s.ses.ExecStmt(`CREATE TABLE IF NOT EXISTS netsync.follows_by_target (target text, actor text, created_at timestamp, PRIMARY KEY ((target, actor)));`); err != nil { 294 - return fmt.Errorf("failed to create follows by target table: %w", err) 295 - } 296 - 297 - if err := s.ses.ExecStmt(`CREATE TABLE IF NOT EXISTS netsync.blocks (actor text, rkey text, target text, created_at timestamp, PRIMARY KEY ((actor, rkey)));`); err != nil { 298 - return fmt.Errorf("failed to create blocks table: %w", err) 299 - } 300 - 301 - if err := s.ses.ExecStmt(`CREATE TABLE IF NOT EXISTS netsync.blocks_by_actor (actor text, target text, created_at timestamp, PRIMARY KEY ((actor, target)));`); err != nil { 302 - return fmt.Errorf("failed to create blocks by actor table: %w", err) 303 - } 304 - 305 - if err := s.ses.ExecStmt(`CREATE TABLE IF NOT EXISTS netsync.blocks_by_target (target text, actor text, created_at timestamp, PRIMARY KEY ((target, actor)));`); err != nil { 306 - return fmt.Errorf("failed to create blocks by target table: %w", err) 307 - } 308 - 309 - if err := s.ses.ExecStmt(`CREATE TABLE IF NOT EXISTS netsync.likes (did text, rkey text, subject text, created_at timestamp, PRIMARY KEY ((did, rkey)));`); err != nil { 310 - return fmt.Errorf("failed to create likes table: %w", err) 311 - } 312 - 313 - if err := s.ses.ExecStmt(`CREATE TABLE IF NOT EXISTS netsync.like_counts (did text, nsid text, rkey text, count counter, PRIMARY KEY ((did, nsid, rkey)));`); err != nil { 314 - return fmt.Errorf("failed to create like counts table: %w", err) 315 - } 316 - 317 - if err := s.ses.ExecStmt(`CREATE TABLE IF NOT EXISTS netsync.reposts (did text, rkey text, subject text, created_at timestamp, PRIMARY KEY ((did, rkey)));`); err != nil { 318 - return fmt.Errorf("failed to create reposts table: %w", err) 319 - } 320 - 321 - if err := s.ses.ExecStmt(`CREATE TABLE IF NOT EXISTS netsync.repost_counts (did text, nsid text, rkey text, count counter, PRIMARY KEY ((did, nsid, rkey)));`); err != nil { 322 - return fmt.Errorf("failed to create repost counts table: %w", err) 323 - } 324 - 325 - return nil 326 - } 327 - 328 - func Playback(cctx *cli.Context) error { 329 - ctx := cctx.Context 330 - ctx, cancel := context.WithCancel(ctx) 331 - defer cancel() 332 - 333 - start := time.Now() 334 - 335 - cluster := gocql.NewCluster(cctx.StringSlice("scylla-nodes")...) 336 - session, err := gocqlx.WrapSession(cluster.CreateSession()) 337 - if err != nil { 338 - return fmt.Errorf("failed to create scylla session: %w", err) 339 - } 340 - 341 - state := &PlaybackState{ 342 - outDir: cctx.String("out-dir"), 343 - workerCount: cctx.Int("worker-count"), 344 - wg: sync.WaitGroup{}, 345 - ses: session, 346 - } 347 - 348 - err = state.SetupSchema() 349 - if err != nil { 350 - return fmt.Errorf("failed to setup schema: %w", err) 351 - } 352 - 353 - state.EnqueuedRepos = make(map[string]*RepoState) 354 - state.FinishedRepos = make(map[string]*RepoState) 355 - 356 - state.exit = make(chan struct{}) 357 - 358 - // Start metrics server 359 - mux := http.NewServeMux() 360 - mux.Handle("/metrics", promhttp.Handler()) 361 - 362 - metricsServer := &http.Server{ 363 - Addr: fmt.Sprintf(":%d", cctx.Int("port")), 364 - Handler: mux, 365 - } 366 - 367 - go func() { 368 - state.wg.Add(1) 369 - defer state.wg.Done() 370 - if err := metricsServer.ListenAndServe(); err != http.ErrServerClosed { 371 - log.Fatalf("failed to start metrics server: %+v", err) 372 - } 373 - log.Info("metrics server shut down successfully") 374 - }() 375 - 376 - // Load all the repos from the out dir 377 - err = filepath.WalkDir(state.outDir, func(path string, d fs.DirEntry, err error) error { 378 - if err != nil { 379 - return fmt.Errorf("failed to walk path: %w", err) 380 - } 381 - 382 - if d.IsDir() { 383 - return nil 384 - } 385 - 386 - state.EnqueuedRepos[d.Name()] = &RepoState{ 387 - Repo: d.Name(), 388 - State: "enqueued", 389 - } 390 - 391 - enqueuedJobs.Inc() 392 - 393 - return nil 394 - }) 395 - if err != nil { 396 - return fmt.Errorf("failed to walk out dir: %w", err) 397 - } 398 - 399 - // Start workers 400 - for i := 0; i < state.workerCount; i++ { 401 - go state.worker(i) 402 - } 403 - 404 - // Check for empty queue 405 - go func() { 406 - state.wg.Add(1) 407 - defer state.wg.Done() 408 - t := time.NewTicker(30 * time.Second) 409 - for { 410 - select { 411 - case <-ctx.Done(): 412 - return 413 - case <-t.C: 414 - state.lk.RLock() 415 - if len(state.EnqueuedRepos) == 0 { 416 - log.Info("no more repos to process, shutting down") 417 - close(state.exit) 418 - return 419 - } 420 - state.lk.RUnlock() 421 - } 422 - } 423 - }() 424 - 425 - // Trap SIGINT to trigger a shutdown. 426 - log.Info("listening for signals") 427 - signals := make(chan os.Signal, 1) 428 - signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 429 - 430 - select { 431 - case sig := <-signals: 432 - cancel() 433 - close(state.exit) 434 - log.Infof("shutting down on signal: %+v", sig) 435 - case <-ctx.Done(): 436 - cancel() 437 - close(state.exit) 438 - log.Info("shutting down on context done") 439 - case <-state.exit: 440 - cancel() 441 - log.Info("shutting down on exit signal") 442 - } 443 - 444 - log.Info("shutting down, waiting for workers to clean up...") 445 - 446 - if err := metricsServer.Shutdown(ctx); err != nil { 447 - log.Errorf("failed to shut down metrics server: %+v", err) 448 - } 449 - 450 - state.wg.Wait() 451 - 452 - p := message.NewPrinter(language.English) 453 - 454 - // Print stats 455 - log.Info(p.Sprintf("processed %d repos and %d UTF-8 text characters in %s", 456 - len(state.FinishedRepos), state.textLen.Load(), time.Since(start))) 457 - log.Info("shut down successfully") 458 - 459 - return nil 460 - } 461 - 462 - func (s *PlaybackState) worker(id int) { 463 - log := log.With("worker", id) 464 - s.wg.Add(1) 465 - defer s.wg.Done() 466 - 467 - for { 468 - select { 469 - case <-s.exit: 470 - return 471 - default: 472 - } 473 - 474 - repo := s.Dequeue() 475 - if repo == "" { 476 - return 477 - } 478 - 479 - processState, err := s.processRepo(context.Background(), repo) 480 - if err != nil { 481 - log.Errorf("failed to process repo (%s): %v", repo, err) 482 - } 483 - 484 - s.Finish(repo, processState) 485 - } 486 - } 487 - 488 - func (s *PlaybackState) processRepo(ctx context.Context, did string) (processState string, err error) { 489 - log := log.With("repo", did) 490 - 491 - log.Debug("processing repo") 492 - 493 - // Open the repo file from the out dir 494 - f, err := os.Open(filepath.Join(s.outDir, did)) 495 - if err != nil { 496 - return "", fmt.Errorf("failed to open repo file: %w", err) 497 - } 498 - defer f.Close() 499 - 500 - r, err := repo.ReadRepoFromCar(ctx, f) 501 - if err != nil { 502 - return "", fmt.Errorf("failed to read repo from car: %w", err) 503 - } 504 - 505 - // maxBatchSize := 1000 506 - 507 - // followBatch := s.ses.NewBatch(gocql.LoggedBatch) 508 - // followBatchSize := 0 509 - 510 - // blockBatch := s.ses.NewBatch(gocql.LoggedBatch) 511 - // blockBatchSize := 0 512 - 513 - // likeBatch := s.ses.NewBatch(gocql.LoggedBatch) 514 - // likeBatchSize := 0 515 - 516 - // repostBatch := s.ses.NewBatch(gocql.LoggedBatch) 517 - // repostBatchSize := 0 518 - 519 - err = r.ForEach(ctx, "", func(path string, _ cid.Cid) error { 520 - select { 521 - case <-s.exit: 522 - return fmt.Errorf("exiting") 523 - default: 524 - } 525 - 526 - _, rec, err := r.GetRecord(ctx, path) 527 - if err != nil { 528 - return fmt.Errorf("failed to get record: %w", err) 529 - } 530 - 531 - rkey := strings.Split(path, "/")[1] 532 - 533 - switch rec := rec.(type) { 534 - case *bsky.FeedPost: 535 - log.Debugf("processing feed post: %s", rec.Text) 536 - s.textLen.Add(uint64(len(rec.Text))) 537 - recCreatedAt, err := dateparse.ParseAny(rec.CreatedAt) 538 - if err != nil { 539 - log.Errorf("failed to parse created at: %+v", err) 540 - return nil 541 - } 542 - 543 - post := Post{ 544 - Did: did, 545 - Rkey: rkey, 546 - Content: rec.Text, 547 - CreatedAt: recCreatedAt, 548 - } 549 - 550 - facets := "" 551 - if rec.Facets != nil && len(rec.Facets) > 0 { 552 - nonNilFacets := []*bsky.RichtextFacet{} 553 - 554 - // Filter out nil facets 555 - for i, facet := range rec.Facets { 556 - for _, feature := range facet.Features { 557 - if feature.RichtextFacet_Link != nil || feature.RichtextFacet_Mention != nil { 558 - nonNilFacets = append(nonNilFacets, rec.Facets[i]) 559 - break 560 - } 561 - } 562 - } 563 - 564 - facetBytes, err := json.Marshal(nonNilFacets) 565 - if err != nil { 566 - log.Errorf("failed to marshal facets: %+v", err) 567 - return nil 568 - } 569 - facets = string(facetBytes) 570 - } 571 - 572 - embed := "" 573 - if rec.Embed != nil { 574 - // Filter out empty embeds 575 - if rec.Embed.EmbedExternal != nil || 576 - rec.Embed.EmbedImages != nil || 577 - rec.Embed.EmbedRecord != nil || 578 - rec.Embed.EmbedRecordWithMedia != nil { 579 - embedBytes, err := json.Marshal(rec.Embed) 580 - if err != nil { 581 - log.Errorf("failed to marshal embed: %+v", err) 582 - return nil 583 - } 584 - embed = string(embedBytes) 585 - } 586 - } 587 - 588 - selfLabels := []string{} 589 - 590 - if rec.Labels != nil && 591 - rec.Labels.LabelDefs_SelfLabels != nil && 592 - len(rec.Labels.LabelDefs_SelfLabels.Values) > 0 { 593 - for _, label := range rec.Labels.LabelDefs_SelfLabels.Values { 594 - selfLabels = append(selfLabels, label.Val) 595 - } 596 - } 597 - 598 - parentParts := []string{} 599 - if rec.Reply != nil && rec.Reply.Parent != nil { 600 - // at://did/app.bsky.feed.post/rkey 601 - parentURI := rec.Reply.Parent.Uri 602 - parentURI = strings.TrimPrefix(parentURI, "at://") 603 - parentParts = strings.Split(parentURI, "/") 604 - if len(parentParts) != 3 { 605 - log.Errorf("invalid parent URI: %s", parentURI) 606 - return nil 607 - } 608 - } 609 - 610 - if facets != "" { 611 - post.Facets = facets 612 - } 613 - 614 - if embed != "" { 615 - post.Embed = embed 616 - } 617 - 618 - if len(parentParts) > 0 { 619 - post.ParentDid = parentParts[0] 620 - post.ParentRkey = parentParts[2] 621 - } 622 - 623 - if len(selfLabels) > 0 { 624 - post.SelfLabels = selfLabels 625 - } 626 - 627 - // Insert into the DID+RKey lookup table 628 - insertPost := postTable.InsertQuery(s.ses) 629 - err = insertPost.BindStruct(&post).ExecRelease() 630 - if err != nil { 631 - log.Errorf("failed to bind post: %w", err) 632 - return nil 633 - } 634 - 635 - // Insert into post into author-indexed table 636 - insertPostByDID := postsByDIDTable.InsertQuery(s.ses) 637 - err = insertPostByDID.BindStruct(&PostByDID{ 638 - Did: did, 639 - Rkey: rkey, 640 - CreatedAt: recCreatedAt, 641 - }).ExecRelease() 642 - if err != nil { 643 - log.Errorf("failed to exec post by did: %w", err) 644 - return nil 645 - } 646 - 647 - // Insert into the reply lookup table 648 - if len(parentParts) > 0 { 649 - insertReply := repliesTable.InsertQuery(s.ses) 650 - err = insertReply.BindStruct(&Reply{ 651 - ParentDid: parentParts[0], 652 - ParentRkey: parentParts[2], 653 - ChildDid: did, 654 - ChildRkey: rkey, 655 - CreatedAt: recCreatedAt, 656 - }).ExecRelease() 657 - if err != nil { 658 - log.Errorf("failed to exec reply: %w", err) 659 - return nil 660 - } 661 - } 662 - case *bsky.FeedLike: 663 - log.Debugf("processing feed like: %s", rec.Subject.Uri) 664 - recCreatedAt, err := dateparse.ParseAny(rec.CreatedAt) 665 - if err != nil { 666 - log.Errorf("failed to parse created at: %+v", err) 667 - return nil 668 - } 669 - 670 - // insertLike := likesTable.InsertQuery(s.ses) 671 - // err = likeBatch.BindStruct(insertLike, &Like{ 672 - // Did: did, 673 - // Rkey: rkey, 674 - // Subject: rec.Subject.Uri, 675 - // CreatedAt: recCreatedAt, 676 - // }) 677 - // if err != nil { 678 - // log.Errorf("failed to bind like: %w", err) 679 - // return nil 680 - // } 681 - // likeBatchSize++ 682 - 683 - // Insert into the DID+RKey lookup table 684 - insertLike := likesTable.InsertQuery(s.ses) 685 - err = insertLike.BindStruct(&Like{ 686 - Did: did, 687 - Rkey: rkey, 688 - Subject: rec.Subject.Uri, 689 - CreatedAt: recCreatedAt, 690 - }).ExecRelease() 691 - if err != nil { 692 - log.Errorf("failed to exec like: %w", err) 693 - return nil 694 - } 695 - 696 - subj := strings.TrimPrefix(rec.Subject.Uri, "at://") 697 - subjParts := strings.Split(subj, "/") 698 - if len(subjParts) != 3 { 699 - log.Errorf("invalid subject: %s", rec.Subject.Uri) 700 - return nil 701 - } 702 - 703 - // Increment counter for subject 704 - updateLikeCount := likeCountTable.UpdateBuilder(). 705 - Add("count").Where(qb.Eq("did"), qb.Eq("nsid"), qb.Eq("rkey")).Query(s.ses). 706 - BindStruct(&LikeCount{ 707 - Did: subjParts[0], 708 - Nsid: subjParts[1], 709 - Rkey: subjParts[2], 710 - Count: 1, 711 - }) 712 - 713 - err = updateLikeCount.ExecRelease() 714 - if err != nil { 715 - log.Errorf("failed to exec like count: %w", err) 716 - return nil 717 - } 718 - 719 - case *bsky.FeedRepost: 720 - log.Debugf("processing feed repost: %s", rec.Subject.Uri) 721 - recCreatedAt, err := dateparse.ParseAny(rec.CreatedAt) 722 - if err != nil { 723 - log.Errorf("failed to parse created at: %+v", err) 724 - return nil 725 - } 726 - 727 - // insertRepost := repostsTable.InsertQuery(s.ses) 728 - // err = repostBatch.BindStruct(insertRepost, &Repost{ 729 - // Did: did, 730 - // Rkey: rkey, 731 - // Subject: rec.Subject.Uri, 732 - // CreatedAt: recCreatedAt, 733 - // }) 734 - // if err != nil { 735 - // log.Errorf("failed to bind repost: %w", err) 736 - // return nil 737 - // } 738 - // repostBatchSize++ 739 - 740 - // Insert into the DID+RKey lookup table 741 - insertRepost := repostsTable.InsertQuery(s.ses) 742 - err = insertRepost.BindStruct(&Repost{ 743 - Did: did, 744 - Rkey: rkey, 745 - Subject: rec.Subject.Uri, 746 - CreatedAt: recCreatedAt, 747 - }).ExecRelease() 748 - if err != nil { 749 - log.Errorf("failed to exec repost: %w", err) 750 - return nil 751 - } 752 - 753 - subj := strings.TrimPrefix(rec.Subject.Uri, "at://") 754 - subjParts := strings.Split(subj, "/") 755 - if len(subjParts) != 3 { 756 - log.Errorf("invalid subject: %s", rec.Subject.Uri) 757 - return nil 758 - } 759 - 760 - // Increment counter for subject 761 - updateRepostCount := repostCountTable.UpdateBuilder(). 762 - Add("count").Where(qb.Eq("did"), qb.Eq("nsid"), qb.Eq("rkey")).Query(s.ses). 763 - BindStruct(&RepostCount{ 764 - Did: subjParts[0], 765 - Nsid: subjParts[1], 766 - Rkey: subjParts[2], 767 - Count: 1, 768 - }) 769 - 770 - err = updateRepostCount.ExecRelease() 771 - if err != nil { 772 - log.Errorf("failed to exec repost count: %w", err) 773 - return nil 774 - } 775 - case *bsky.GraphFollow: 776 - log.Debugf("processing graph follow: %s", rec.Subject) 777 - recCreatedAt, err := dateparse.ParseAny(rec.CreatedAt) 778 - if err != nil { 779 - log.Errorf("failed to parse created at: %+v", err) 780 - return nil 781 - } 782 - 783 - // Batch 784 - // insertFollow := followsTable.InsertQuery(s.ses) 785 - // err = followBatch.BindStruct(insertFollow, &FollowByActor{ 786 - // Actor: did, 787 - // Target: rec.Subject, 788 - // CreatedAt: recCreatedAt, 789 - // }) 790 - // if err != nil { 791 - // log.Errorf("failed to bind follow: %w", err) 792 - // return nil 793 - // } 794 - // followBatchSize++ 795 - 796 - // Insert follow to DID+RKey Lookup table 797 - insertFollow := followsTable.InsertQuery(s.ses) 798 - err = insertFollow.BindStruct(&Follow{ 799 - Actor: did, 800 - Rkey: rkey, 801 - Target: rec.Subject, 802 - CreatedAt: recCreatedAt, 803 - }).ExecRelease() 804 - if err != nil { 805 - log.Errorf("failed to exec follow: %w", err) 806 - return nil 807 - } 808 - 809 - // Insert follow to Actor Lookup table 810 - insertFollowByActor := followByActorTable.InsertQuery(s.ses) 811 - err = insertFollowByActor.BindStruct(&FollowByActor{ 812 - Actor: did, 813 - Target: rec.Subject, 814 - CreatedAt: recCreatedAt, 815 - }).ExecRelease() 816 - if err != nil { 817 - log.Errorf("failed to exec follow by actor: %w", err) 818 - return nil 819 - } 820 - 821 - // Insert follow to Target Lookup table 822 - insertFollowByTarget := followByTargetTable.InsertQuery(s.ses) 823 - err = insertFollowByTarget.BindStruct(&FollowByTarget{ 824 - Target: rec.Subject, 825 - Actor: did, 826 - CreatedAt: recCreatedAt, 827 - }).ExecRelease() 828 - if err != nil { 829 - log.Errorf("failed to exec follow by target: %w", err) 830 - return nil 831 - } 832 - case *bsky.GraphBlock: 833 - log.Debugf("processing graph block: %s", rec.Subject) 834 - recCreatedAt, err := dateparse.ParseAny(rec.CreatedAt) 835 - if err != nil { 836 - log.Errorf("failed to parse created at: %+v", err) 837 - return nil 838 - } 839 - 840 - // Insert follow to DID+RKey Lookup table 841 - insertBlock := blocksTable.InsertQuery(s.ses) 842 - err = insertBlock.BindStruct(&Block{ 843 - Actor: did, 844 - Rkey: rkey, 845 - Target: rec.Subject, 846 - CreatedAt: recCreatedAt, 847 - }).ExecRelease() 848 - if err != nil { 849 - log.Errorf("failed to exec block: %w", err) 850 - return nil 851 - } 852 - 853 - // Insert block to Actor Lookup table 854 - insertBlockByActor := blockByActorTable.InsertQuery(s.ses) 855 - err = insertBlockByActor.BindStruct(&BlockByActor{ 856 - Actor: did, 857 - Target: rec.Subject, 858 - CreatedAt: recCreatedAt, 859 - }).ExecRelease() 860 - if err != nil { 861 - log.Errorf("failed to exec block by actor: %w", err) 862 - return nil 863 - } 864 - 865 - // Insert block to Target Lookup table 866 - insertBlockByTarget := blockByTargetTable.InsertQuery(s.ses) 867 - err = insertBlockByTarget.BindStruct(&BlockByTarget{ 868 - Target: rec.Subject, 869 - Actor: did, 870 - CreatedAt: recCreatedAt, 871 - }).ExecRelease() 872 - if err != nil { 873 - log.Errorf("failed to exec block by target: %w", err) 874 - return nil 875 - } 876 - case *bsky.ActorProfile: 877 - if rec.DisplayName != nil { 878 - log.Debugf("processing actor profile: %s", *rec.DisplayName) 879 - } 880 - } 881 - 882 - // if followBatchSize >= maxBatchSize { 883 - // err = s.ses.ExecuteBatch(followBatch) 884 - // if err != nil { 885 - // log.Errorf("failed to execute batch: %w", err) 886 - // } 887 - // followBatch = s.ses.NewBatch(gocql.LoggedBatch) 888 - // followBatchSize = 0 889 - // } 890 - 891 - // if blockBatchSize >= maxBatchSize { 892 - // err = s.ses.ExecuteBatch(blockBatch) 893 - // if err != nil { 894 - // log.Errorf("failed to execute batch: %w", err) 895 - // } 896 - // blockBatch = s.ses.NewBatch(gocql.LoggedBatch) 897 - // blockBatchSize = 0 898 - // } 899 - 900 - // if likeBatchSize >= maxBatchSize { 901 - // err = s.ses.ExecuteBatch(likeBatch) 902 - // if err != nil { 903 - // log.Errorf("failed to execute batch: %w", err) 904 - // } 905 - // likeBatch = s.ses.NewBatch(gocql.LoggedBatch) 906 - // likeBatchSize = 0 907 - // } 908 - 909 - // if repostBatchSize >= maxBatchSize { 910 - // err = s.ses.ExecuteBatch(repostBatch) 911 - // if err != nil { 912 - // log.Errorf("failed to execute batch: %w", err) 913 - // } 914 - // repostBatch = s.ses.NewBatch(gocql.LoggedBatch) 915 - // repostBatchSize = 0 916 - // } 917 - 918 - return nil 919 - }) 920 - if err != nil { 921 - return "failed (repo foreach)", fmt.Errorf("failed to process repo: %w", err) 922 - } 923 - 924 - // if followBatchSize > 0 { 925 - // err = s.ses.ExecuteBatch(followBatch) 926 - // if err != nil { 927 - // return "failed (batch)", fmt.Errorf("failed to execute batch: %w", err) 928 - // } 929 - // } 930 - 931 - // if blockBatchSize > 0 { 932 - // err = s.ses.ExecuteBatch(blockBatch) 933 - // if err != nil { 934 - // return "failed (batch)", fmt.Errorf("failed to execute batch: %w", err) 935 - // } 936 - // } 937 - 938 - // if likeBatchSize > 0 { 939 - // err = s.ses.ExecuteBatch(likeBatch) 940 - // if err != nil { 941 - // return "failed (batch)", fmt.Errorf("failed to execute batch: %w", err) 942 - // } 943 - // } 944 - 945 - // if repostBatchSize > 0 { 946 - // err = s.ses.ExecuteBatch(repostBatch) 947 - // if err != nil { 948 - // return "failed (batch)", fmt.Errorf("failed to execute batch: %w", err) 949 - // } 950 - // } 951 - 952 - return "finished", nil 953 - }
-360
cmd/netsync/query.go
··· 1 - package main 2 - 3 - import ( 4 - "bufio" 5 - "context" 6 - "fmt" 7 - "os" 8 - "strings" 9 - "sync" 10 - "sync/atomic" 11 - "time" 12 - 13 - "github.com/gocql/gocql" 14 - "github.com/scylladb/gocqlx/v2" 15 - "github.com/scylladb/gocqlx/v2/qb" 16 - "github.com/urfave/cli/v2" 17 - "golang.org/x/sync/semaphore" 18 - "golang.org/x/text/language" 19 - "golang.org/x/text/message" 20 - ) 21 - 22 - func GetPostsForUser(cctx *cli.Context) error { 23 - ctx := cctx.Context 24 - ctx, cancel := context.WithCancel(ctx) 25 - defer cancel() 26 - 27 - // cluster := gocql.NewCluster(cctx.StringSlice("scylla-nodes")...) 28 - // session, err := gocqlx.WrapSession(cluster.CreateSession()) 29 - // if err != nil { 30 - // return fmt.Errorf("failed to create scylla session: %w", err) 31 - // } 32 - 33 - // args := cctx.Args() 34 - // if args.Len() != 1 { 35 - // return fmt.Errorf("must provide a did") 36 - // } 37 - 38 - // did := args.First() 39 - 40 - // limit := 500 41 - 42 - // numRuns := 2000 43 - // maxConcurrent := 40 44 - // sem := semaphore.NewWeighted(int64(maxConcurrent)) 45 - 46 - // totalRowsRead := atomic.Uint64{} 47 - 48 - // runtimes := make(chan time.Duration, numRuns) 49 - 50 - // start := time.Now() 51 - 52 - return nil 53 - } 54 - 55 - func Trim(cctx *cli.Context) error { 56 - ctx := cctx.Context 57 - ctx, cancel := context.WithCancel(ctx) 58 - defer cancel() 59 - 60 - limit := uint(20_001) 61 - 62 - // Read repo list 63 - repoListFile, err := os.Open(cctx.String("repo-list")) 64 - if err != nil { 65 - return err 66 - } 67 - 68 - fileScanner := bufio.NewScanner(repoListFile) 69 - fileScanner.Split(bufio.ScanLines) 70 - 71 - repos := []string{} 72 - 73 - for fileScanner.Scan() { 74 - repo := fileScanner.Text() 75 - repos = append(repos, repo) 76 - } 77 - 78 - cluster := gocql.NewCluster(cctx.StringSlice("scylla-nodes")...) 79 - session, err := gocqlx.WrapSession(cluster.CreateSession()) 80 - if err != nil { 81 - return fmt.Errorf("failed to create scylla session: %w", err) 82 - } 83 - 84 - // Trim posts_by_did to 20,000 posts 85 - // Select 20k posts, grab the `created_at` of the last one, then delete all posts with a `created_at` less than that 86 - 87 - type output struct { 88 - err error 89 - msg string 90 - duration time.Duration 91 - } 92 - 93 - st := time.Now() 94 - outChan := make(chan output, len(repos)) 95 - sem := semaphore.NewWeighted(int64(cctx.Int("worker-count"))) 96 - var wg sync.WaitGroup 97 - 98 - // Run in parallel 99 - for _, repo := range repos { 100 - wg.Add(1) 101 - go func(repo string) { 102 - defer wg.Done() 103 - sem.Acquire(ctx, 1) 104 - defer sem.Release(1) 105 - 106 - start := time.Now() 107 - log := log.With("repo", repo) 108 - 109 - posts := []PostByDID{} 110 - err = postsByDIDTable.SelectBuilder(). 111 - Limit(limit). 112 - OrderBy("created_at", qb.DESC). 113 - QueryContext(ctx, session). 114 - BindStruct(&PostByDID{Did: repo}). 115 - SelectRelease(&posts) 116 - if err != nil { 117 - outChan <- output{err: fmt.Errorf("failed to get posts: %w", err)} 118 - return 119 - } 120 - 121 - if len(posts) == 0 { 122 - outChan <- output{msg: "no posts found for DID", duration: time.Since(start)} 123 - return 124 - } 125 - 126 - loadTime := time.Now() 127 - 128 - log.Debugw("got posts", "num_posts", len(posts), "duration", loadTime.Sub(start).String()) 129 - 130 - if len(posts) < int(limit) { 131 - outChan <- output{msg: "no posts to trim", duration: time.Since(start)} 132 - return 133 - } 134 - 135 - // Get the last post's created_at 136 - lastPostCreatedAt := posts[len(posts)-2].CreatedAt 137 - 138 - // Delete all posts with a created_at less than the last post's created_at 139 - err = qb.Delete(postsByDIDTable.Name()). 140 - Where(qb.Eq("did"), qb.Lt("created_at")). 141 - QueryContext(ctx, session). 142 - BindStruct(&PostByDID{Did: repo, CreatedAt: lastPostCreatedAt}). 143 - ExecRelease() 144 - if err != nil { 145 - outChan <- output{err: fmt.Errorf("failed to delete posts: %w", err)} 146 - return 147 - } 148 - 149 - deleteTime := time.Now() 150 - 151 - log.Debugw("deleted posts", "duration", deleteTime.Sub(loadTime).String()) 152 - 153 - outChan <- output{msg: "trimmed posts", duration: time.Since(start)} 154 - }(repo) 155 - } 156 - 157 - // Wait for all the queries to finish 158 - wg.Wait() 159 - close(outChan) 160 - 161 - end := time.Now() 162 - 163 - // Enumerate the results 164 - var total time.Duration 165 - errs := []error{} 166 - successes := 0 167 - for out := range outChan { 168 - if out.err != nil { 169 - errs = append(errs, out.err) 170 - } 171 - if out.duration > 0 { 172 - total += out.duration 173 - successes++ 174 - } 175 - if out.msg != "" { 176 - log.Debug(out.msg) 177 - } 178 - } 179 - 180 - // Calculate the average runtime 181 - avg := total / time.Duration(successes) 182 - 183 - p := message.NewPrinter(language.English) 184 - 185 - log.Info(p.Sprintf("trimmed %d repos (%d errors) in %s (avg: %s, total: %s)", successes, len(errs), end.Sub(st), avg, total)) 186 - 187 - return nil 188 - } 189 - 190 - func Query(cctx *cli.Context) error { 191 - ctx := cctx.Context 192 - ctx, cancel := context.WithCancel(ctx) 193 - defer cancel() 194 - 195 - cluster := gocql.NewCluster(cctx.StringSlice("scylla-nodes")...) 196 - session, err := gocqlx.WrapSession(cluster.CreateSession()) 197 - if err != nil { 198 - return fmt.Errorf("failed to create scylla session: %w", err) 199 - } 200 - 201 - args := cctx.Args() 202 - if args.Len() != 1 { 203 - return fmt.Errorf("must provide a post URI") 204 - } 205 - postURI := args.First() 206 - 207 - // at://did/app.bsky.feed.post/rkey 208 - postURI = strings.TrimPrefix(postURI, "at://") 209 - postParts := strings.Split(postURI, "/") 210 - if len(postParts) != 3 { 211 - return fmt.Errorf("invalid post URI: %s", postURI) 212 - } 213 - 214 - numRuns := 500000 215 - maxConcurrent := 400 216 - sem := semaphore.NewWeighted(int64(maxConcurrent)) 217 - 218 - totalRowsRead := atomic.Uint64{} 219 - 220 - runtimes := make(chan time.Duration, numRuns) 221 - 222 - start := time.Now() 223 - 224 - // Run the query in numRuns goroutines 225 - var pwg sync.WaitGroup 226 - for i := 0; i < numRuns; i++ { 227 - pwg.Add(1) 228 - go func() error { 229 - defer pwg.Done() 230 - sem.Acquire(ctx, 1) 231 - defer sem.Release(1) 232 - iterStart := time.Now() 233 - defer func() { 234 - runtimes <- time.Since(iterStart) 235 - }() 236 - 237 - // Get the post 238 - post := Post{ 239 - Did: postParts[0], 240 - Rkey: postParts[2], 241 - } 242 - err = postTable.GetQuery(session).BindStruct(&post).GetRelease(&post) 243 - if err != nil { 244 - return fmt.Errorf("failed to get post: %w", err) 245 - } 246 - 247 - totalRowsRead.Add(1) 248 - 249 - // Get the replies 250 - replyRefs := []Reply{} 251 - err = repliesTable.SelectQuery(session).BindStruct(&Reply{ 252 - ParentDid: postParts[0], 253 - ParentRkey: postParts[2], 254 - }).SelectRelease(&replyRefs) 255 - if err != nil { 256 - return fmt.Errorf("failed to get replies: %w", err) 257 - } 258 - 259 - totalRowsRead.Add(uint64(len(replyRefs))) 260 - 261 - replies := []Post{} 262 - lk := sync.Mutex{} 263 - 264 - // Resolve the replies as posts in parallel 265 - var wg sync.WaitGroup 266 - for i := range replyRefs { 267 - wg.Add(1) 268 - replyRef := replyRefs[i] 269 - go func(replyRef Reply) { 270 - defer wg.Done() 271 - 272 - reply := Post{ 273 - Did: replyRef.ChildDid, 274 - Rkey: replyRef.ChildRkey, 275 - } 276 - 277 - err = postTable.GetQuery(session).BindStruct(&reply).GetRelease(&reply) 278 - if err != nil { 279 - log.Errorf("failed to get reply: %+v", err) 280 - return 281 - } 282 - lk.Lock() 283 - replies = append(replies, reply) 284 - lk.Unlock() 285 - }(replyRef) 286 - } 287 - 288 - totalRowsRead.Add(uint64(len(replies))) 289 - 290 - // Resolve the parent up to the root 291 - parents := []Post{} 292 - if post.ParentDid != "" && post.ParentRkey != "" { 293 - wg.Add(1) 294 - go func() { 295 - defer wg.Done() 296 - parentDid := post.ParentDid 297 - parentRkey := post.ParentRkey 298 - for { 299 - parent := Post{ 300 - Did: parentDid, 301 - Rkey: parentRkey, 302 - } 303 - err = postTable.GetQuery(session).BindStruct(&parent).GetRelease(&parent) 304 - if err != nil && err != gocql.ErrNotFound { 305 - log.Errorf("failed to get parent: %+v", err) 306 - return 307 - } 308 - 309 - parents = append(parents, parent) 310 - 311 - if parent.ParentDid == "" { 312 - break 313 - } 314 - 315 - parentDid = parent.ParentDid 316 - parentRkey = parent.ParentRkey 317 - } 318 - }() 319 - } 320 - 321 - totalRowsRead.Add(uint64(len(parents))) 322 - 323 - wg.Wait() 324 - return nil 325 - }() 326 - } 327 - 328 - // // Print the thread 329 - // p := message.NewPrinter(language.English) 330 - // log.Debugf("post: %s", post.Content) 331 - // log.Debugf("replies: %d", len(replies)) 332 - // for _, reply := range replies { 333 - // log.Debugf(" %s", reply.Content) 334 - // } 335 - // slices.Reverse(parents) 336 - // log.Debugf("parents: %d", len(parents)) 337 - // for _, parent := range parents { 338 - // log.Debugf(" %s", parent.Content) 339 - // } 340 - 341 - // log.Info(p.Sprintf("processed post with %d replies and resolved %d parents in %s", len(replies), len(parents), time.Since(start))) 342 - 343 - // Wait for all the queries to finish 344 - pwg.Wait() 345 - clockTime := time.Since(start) 346 - close(runtimes) 347 - 348 - // Calculate the average runtime 349 - var total time.Duration 350 - for runtime := range runtimes { 351 - total += runtime 352 - } 353 - avg := total / time.Duration(numRuns) 354 - 355 - p := message.NewPrinter(language.English) 356 - 357 - log.Info(p.Sprintf("processed post %d times (%d total reads) in %s (avg: %s, total: %s)", numRuns, totalRowsRead.Load(), clockTime, avg, total)) 358 - 359 - return nil 360 - }