this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at feat/repo-parse-allocs 997 lines 23 kB view raw
1package main 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "net/http" 10 "os" 11 "os/signal" 12 "strconv" 13 "strings" 14 "sync" 15 "syscall" 16 "time" 17 18 "github.com/bluesky-social/indigo/api/atproto" 19 comatproto "github.com/bluesky-social/indigo/api/atproto" 20 "github.com/bluesky-social/indigo/api/bsky" 21 "github.com/bluesky-social/indigo/atproto/identity" 22 "github.com/bluesky-social/indigo/atproto/syntax" 23 "github.com/bluesky-social/indigo/did" 24 "github.com/bluesky-social/indigo/events" 25 "github.com/bluesky-social/indigo/events/schedulers/sequential" 26 lexutil "github.com/bluesky-social/indigo/lex/util" 27 "github.com/bluesky-social/indigo/repo" 28 "github.com/bluesky-social/indigo/repomgr" 29 "github.com/bluesky-social/indigo/util" 30 "github.com/bluesky-social/indigo/util/cliutil" 31 "github.com/bluesky-social/indigo/xrpc" 32 33 "github.com/gorilla/websocket" 34 "github.com/ipfs/go-cid" 35 "github.com/ipfs/go-libipfs/blocks" 36 "github.com/ipld/go-car/v2" 37 cli "github.com/urfave/cli/v2" 38) 39 40var debugCmd = &cli.Command{ 41 Name: "debug", 42 Usage: "a set of debugging utilities for atproto", 43 Subcommands: []*cli.Command{ 44 inspectEventCmd, 45 debugStreamCmd, 46 debugFeedGenCmd, 47 debugFeedViewCmd, 48 compareStreamsCmd, 49 debugGetRepoCmd, 50 debugCompareReposCmd, 51 }, 52} 53 54var inspectEventCmd = &cli.Command{ 55 Name: "inspect-event", 56 Flags: []cli.Flag{ 57 &cli.StringFlag{ 58 Name: "host", 59 Required: true, 60 }, 61 &cli.BoolFlag{ 62 Name: "dump-raw-blocks", 63 }, 64 }, 65 ArgsUsage: `<cursor>`, 66 Action: func(cctx *cli.Context) error { 67 n, err := strconv.Atoi(cctx.Args().First()) 68 if err != nil { 69 return err 70 } 71 72 h := cctx.String("host") 73 74 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", h, n-1) 75 d := websocket.DefaultDialer 76 con, _, err := d.Dial(url, http.Header{}) 77 if err != nil { 78 return fmt.Errorf("dial failure: %w", err) 79 } 80 81 var errFoundIt = fmt.Errorf("gotem") 82 83 var match *comatproto.SyncSubscribeRepos_Commit 84 85 ctx := context.TODO() 86 rsc := &events.RepoStreamCallbacks{ 87 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 88 n := int64(n) 89 if evt.Seq == n { 90 match = evt 91 return errFoundIt 92 } 93 if evt.Seq > n { 94 return fmt.Errorf("record not found in stream") 95 } 96 97 return nil 98 }, 99 RepoInfo: func(evt *comatproto.SyncSubscribeRepos_Info) error { 100 return nil 101 }, 102 // TODO: all the other Repo* event types 103 Error: func(evt *events.ErrorFrame) error { 104 return fmt.Errorf("%s: %s", evt.Error, evt.Message) 105 }, 106 } 107 108 seqScheduler := sequential.NewScheduler("debug-inspect-event", rsc.EventHandler) 109 err = events.HandleRepoStream(ctx, con, seqScheduler, nil) 110 if err != errFoundIt { 111 return err 112 } 113 114 b, err := json.MarshalIndent(match, "", " ") 115 if err != nil { 116 return err 117 } 118 fmt.Println(string(b)) 119 120 br, err := car.NewBlockReader(bytes.NewReader(match.Blocks)) 121 if err != nil { 122 return err 123 } 124 125 fmt.Println("\nSlice Dump:") 126 fmt.Println("Root: ", br.Roots[0]) 127 for { 128 blk, err := br.Next() 129 if err != nil { 130 if err == io.EOF { 131 break 132 } 133 return err 134 } 135 136 fmt.Println(blk.Cid()) 137 if cctx.Bool("dump-raw-blocks") { 138 fmt.Printf("%x\n", blk.RawData()) 139 } 140 } 141 142 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(match.Blocks)) 143 if err != nil { 144 return fmt.Errorf("opening repo from slice: %w", err) 145 } 146 147 fmt.Println("\nOps: ") 148 for _, op := range match.Ops { 149 switch repomgr.EventKind(op.Action) { 150 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 151 rcid, _, err := r.GetRecord(ctx, op.Path) 152 if err != nil { 153 return fmt.Errorf("loading %q: %w", op.Path, err) 154 } 155 if rcid != cid.Cid(*op.Cid) { 156 return fmt.Errorf("mismatch in record cid %s != %s", rcid, *op.Cid) 157 } 158 fmt.Printf("%s (%s): %s\n", op.Action, op.Path, *op.Cid) 159 } 160 } 161 162 return nil 163 }, 164} 165 166type eventInfo struct { 167 LastSeq int64 168 LastRev string 169} 170 171func cidStr(c *lexutil.LexLink) string { 172 if c == nil { 173 return "<nil>" 174 } 175 176 return c.String() 177} 178 179var debugStreamCmd = &cli.Command{ 180 Name: "debug-stream", 181 Flags: []cli.Flag{ 182 &cli.StringFlag{ 183 Name: "host", 184 Required: true, 185 }, 186 &cli.BoolFlag{ 187 Name: "dump-raw-blocks", 188 }, 189 }, 190 ArgsUsage: `<cursor>`, 191 Action: func(cctx *cli.Context) error { 192 n, err := strconv.Atoi(cctx.Args().First()) 193 if err != nil { 194 return err 195 } 196 197 h := cctx.String("host") 198 199 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", h, n) 200 d := websocket.DefaultDialer 201 con, _, err := d.Dial(url, http.Header{}) 202 if err != nil { 203 return fmt.Errorf("dial failure: %w", err) 204 } 205 206 infos := make(map[string]*eventInfo) 207 208 var lastSeq int64 = -1 209 ctx := context.TODO() 210 rsc := &events.RepoStreamCallbacks{ 211 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 212 213 fmt.Printf("\rChecking seq: %d ", evt.Seq) 214 if lastSeq > 0 && evt.Seq != lastSeq+1 { 215 fmt.Println("Gap in sequence numbers: ", lastSeq, evt.Seq) 216 } 217 lastSeq = evt.Seq 218 219 if !evt.TooBig { 220 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 221 if err != nil { 222 fmt.Printf("\nEvent at sequence %d had an invalid repo slice: %s\n", evt.Seq, err) 223 return nil 224 } else { 225 _ = r 226 /* "prev" is no longer included in #commit messages 227 prev, err := r.PrevCommit(ctx) 228 if err != nil { 229 return err 230 } 231 232 var cs, es string 233 if prev != nil { 234 cs = prev.String() 235 } 236 237 if evt.Prev != nil { 238 es = evt.Prev.String() 239 } 240 241 if !evt.Rebase && cs != es { 242 fmt.Printf("\nEvent at sequence %d has mismatch between slice prev and struct prev: %s != %s\n", evt.Seq, prev, evt.Prev) 243 } 244 */ 245 } 246 } 247 248 cur, ok := infos[evt.Repo] 249 if ok { 250 if evt.Since != nil && cur.LastRev != *evt.Since { 251 /* 252 fmt.Println() 253 fmt.Printf("Event at sequence %d, repo=%s had since=%s, but last rev we saw was %s (seq=%d)\n", evt.Seq, evt.Repo, evt.Since, cur.LastRev, cur.LastSeq) 254 */ 255 } 256 } 257 258 infos[evt.Repo] = &eventInfo{ 259 LastSeq: evt.Seq, 260 LastRev: evt.Rev, 261 } 262 263 return nil 264 }, 265 RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { 266 fmt.Printf("\rChecking seq: %d ", evt.Seq) 267 if lastSeq > 0 && evt.Seq != lastSeq+1 { 268 fmt.Println("Gap in sequence numbers: ", lastSeq, evt.Seq) 269 } 270 lastSeq = evt.Seq 271 return nil 272 }, 273 RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error { 274 fmt.Printf("\rChecking seq: %d ", evt.Seq) 275 if lastSeq > 0 && evt.Seq != lastSeq+1 { 276 fmt.Println("Gap in sequence numbers: ", lastSeq, evt.Seq) 277 } 278 lastSeq = evt.Seq 279 return nil 280 }, 281 RepoInfo: func(evt *comatproto.SyncSubscribeRepos_Info) error { 282 return nil 283 }, 284 // TODO: all the other Repo* event types 285 Error: func(evt *events.ErrorFrame) error { 286 return fmt.Errorf("%s: %s", evt.Error, evt.Message) 287 }, 288 } 289 seqScheduler := sequential.NewScheduler("debug-stream", rsc.EventHandler) 290 err = events.HandleRepoStream(ctx, con, seqScheduler, nil) 291 if err != nil { 292 return err 293 } 294 295 return nil 296 }, 297} 298 299var compareStreamsCmd = &cli.Command{ 300 Name: "compare-streams", 301 Flags: []cli.Flag{ 302 &cli.StringFlag{ 303 Name: "host1", 304 Required: true, 305 }, 306 &cli.StringFlag{ 307 Name: "host2", 308 Required: true, 309 }, 310 }, 311 ArgsUsage: `<cursor>`, 312 Action: func(cctx *cli.Context) error { 313 h1 := cctx.String("host1") 314 h2 := cctx.String("host2") 315 316 url1 := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", h1) 317 url2 := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", h2) 318 319 d := websocket.DefaultDialer 320 321 eventChans := []chan *comatproto.SyncSubscribeRepos_Commit{ 322 make(chan *comatproto.SyncSubscribeRepos_Commit, 2), 323 make(chan *comatproto.SyncSubscribeRepos_Commit, 2), 324 } 325 326 buffers := []map[string][]*comatproto.SyncSubscribeRepos_Commit{ 327 make(map[string][]*comatproto.SyncSubscribeRepos_Commit), 328 make(map[string][]*comatproto.SyncSubscribeRepos_Commit), 329 } 330 331 addToBuffer := func(n int, event *comatproto.SyncSubscribeRepos_Commit) { 332 buffers[n][event.Repo] = append(buffers[n][event.Repo], event) 333 } 334 335 pll := func(ll *lexutil.LexLink) string { 336 if ll == nil { 337 return "<nil>" 338 } 339 return ll.String() 340 } 341 342 findMatchAndRemove := func(n int, event *comatproto.SyncSubscribeRepos_Commit) (*comatproto.SyncSubscribeRepos_Commit, error) { 343 buf := buffers[n] 344 slice, ok := buf[event.Repo] 345 if !ok || len(slice) == 0 { 346 return nil, nil 347 } 348 349 for i, ev := range slice { 350 if ev.Commit == event.Commit { 351 _ = pll 352 /* TODO: prev is no longer included in #commit messages; could use prevData or rev? 353 if pll(ev.Prev) != pll(event.Prev) { 354 // same commit different prev?? 355 return nil, fmt.Errorf("matched event with same commit but different prev: (%d) %d - %d", n, ev.Seq, event.Seq) 356 } 357 */ 358 } 359 360 if i != 0 { 361 fmt.Printf("detected skipped event: %d (%d)\n", slice[0].Seq, i) 362 } 363 364 slice = slice[i+1:] 365 buf[event.Repo] = slice 366 return ev, nil 367 } 368 369 return nil, fmt.Errorf("did not find matching event despite having events in buffer") 370 } 371 372 printCurrentDelta := func() { 373 var a, b int 374 for _, sl := range buffers[0] { 375 a += len(sl) 376 } 377 for _, sl := range buffers[1] { 378 b += len(sl) 379 } 380 381 fmt.Printf("%d %d\n", a, b) 382 } 383 384 printDetailedDelta := func() { 385 for did, sl := range buffers[0] { 386 osl := buffers[1][did] 387 if len(osl) > 0 && len(sl) > 0 { 388 fmt.Printf("%s had mismatched events on both streams (%d, %d)\n", did, len(sl), len(osl)) 389 } 390 391 } 392 } 393 394 // Create two goroutines for reading events from two URLs 395 for i, url := range []string{url1, url2} { 396 go func(i int, url string) { 397 con, _, err := d.Dial(url, http.Header{}) 398 if err != nil { 399 log.Error("Dial failure", "i", i, "url", url, "err", err) 400 os.Exit(1) 401 } 402 403 ctx := context.TODO() 404 rsc := &events.RepoStreamCallbacks{ 405 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 406 eventChans[i] <- evt 407 return nil 408 }, 409 // TODO: all the other Repo* event types 410 Error: func(evt *events.ErrorFrame) error { 411 return fmt.Errorf("%s: %s", evt.Error, evt.Message) 412 }, 413 } 414 seqScheduler := sequential.NewScheduler(fmt.Sprintf("debug-stream-%d", i+1), rsc.EventHandler) 415 if err := events.HandleRepoStream(ctx, con, seqScheduler, nil); err != nil { 416 log.Error("HandleRepoStream failure", "i", i, "url", url, "err", err) 417 os.Exit(1) 418 } 419 }(i, url) 420 } 421 422 ch := make(chan os.Signal, 1) 423 signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT) 424 425 // Compare events from the two URLs 426 for { 427 select { 428 case event := <-eventChans[0]: 429 partner, err := findMatchAndRemove(1, event) 430 if err != nil { 431 fmt.Println("checking for match failed: ", err) 432 continue 433 } 434 if partner == nil { 435 addToBuffer(0, event) 436 } else { 437 // the good case 438 fmt.Println("Match found") 439 } 440 441 case event := <-eventChans[1]: 442 partner, err := findMatchAndRemove(0, event) 443 if err != nil { 444 fmt.Println("checking for match failed: ", err) 445 continue 446 } 447 if partner == nil { 448 addToBuffer(1, event) 449 } else { 450 // the good case 451 fmt.Println("Match found") 452 } 453 case <-ch: 454 printDetailedDelta() 455 /* 456 b, err := json.Marshal(buffers) 457 if err != nil { 458 return err 459 } 460 461 fmt.Println(string(b)) 462 */ 463 return nil 464 } 465 466 printCurrentDelta() 467 } 468 }, 469} 470 471var debugFeedGenCmd = &cli.Command{ 472 Name: "debug-feed", 473 ArgsUsage: "<at-uri>", 474 Action: func(cctx *cli.Context) error { 475 xrpcc, err := cliutil.GetXrpcClient(cctx, true) 476 if err != nil { 477 return err 478 } 479 480 didr := cliutil.GetDidResolver(cctx) 481 482 uri := cctx.Args().First() 483 puri, err := util.ParseAtUri(uri) 484 if err != nil { 485 return err 486 } 487 488 ctx := context.TODO() 489 490 out, err := atproto.RepoGetRecord(ctx, xrpcc, "", puri.Collection, puri.Did, puri.Rkey) 491 if err != nil { 492 return fmt.Errorf("getting record: %w", err) 493 } 494 495 fgr, ok := out.Value.Val.(*bsky.FeedGenerator) 496 if !ok { 497 return fmt.Errorf("invalid feedgen record") 498 } 499 500 fmt.Println("Feed DID is: ", fgr.Did) 501 doc, err := didr.GetDocument(ctx, fgr.Did) 502 if err != nil { 503 return err 504 } 505 506 fmt.Println("Got service did document:") 507 b, err := json.MarshalIndent(doc, "", " ") 508 if err != nil { 509 return err 510 } 511 fmt.Println(string(b)) 512 513 var ss *did.Service 514 for _, s := range doc.Service { 515 if s.ID.String() == "#bsky_fg" { 516 cp := s 517 ss = &cp 518 break 519 } 520 } 521 522 if ss == nil { 523 return fmt.Errorf("No '#bsky_fg' service entry found in feedgens DID document") 524 } 525 526 fmt.Println("Service endpoint is: ", ss.ServiceEndpoint) 527 528 fgclient := &xrpc.Client{ 529 Host: ss.ServiceEndpoint, 530 } 531 532 desc, err := bsky.FeedDescribeFeedGenerator(ctx, fgclient) 533 if err != nil { 534 return err 535 } 536 537 fmt.Printf("Found %d feeds at discovered endpoint\n", len(desc.Feeds)) 538 var found bool 539 for _, f := range desc.Feeds { 540 fmt.Println("Feed: ", f.Uri) 541 if f.Uri == uri { 542 found = true 543 break 544 } 545 } 546 547 if !found { 548 return fmt.Errorf("specified feed was not present in linked feedGenerators 'describe' method output") 549 } 550 551 skel, err := bsky.FeedGetFeedSkeleton(ctx, fgclient, "", uri, 30) 552 if err != nil { 553 return fmt.Errorf("failed to fetch feed skeleton: %w", err) 554 } 555 556 if len(skel.Feed) > 30 { 557 return fmt.Errorf("feedgen not respecting limit param (returned %d posts)", len(skel.Feed)) 558 } 559 560 if len(skel.Feed) == 0 { 561 return fmt.Errorf("feedgen response is empty (might be expected since we aren't authed)") 562 } 563 564 fmt.Println("Feed response looks good!") 565 566 seen := make(map[string]bool) 567 for _, p := range skel.Feed { 568 seen[p.Post] = true 569 } 570 571 curs := skel.Cursor 572 for i := 0; i < 10 && curs != nil; i++ { 573 fmt.Println("Response had cursor: ", *curs) 574 nresp, err := bsky.FeedGetFeedSkeleton(ctx, fgclient, *curs, uri, 10) 575 if err != nil { 576 return fmt.Errorf("fetching paginated feed failed: %w", err) 577 } 578 579 fmt.Printf("Got %d posts from cursored query\n", len(nresp.Feed)) 580 581 if len(nresp.Feed) > 10 { 582 return fmt.Errorf("got more posts than we requested") 583 } 584 585 for _, p := range nresp.Feed { 586 if seen[p.Post] { 587 return fmt.Errorf("duplicate post in response: %s", p.Post) 588 } 589 590 seen[p.Post] = true 591 } 592 593 if len(nresp.Feed) == 0 || nresp.Cursor == nil { 594 break 595 } 596 597 curs = nresp.Cursor 598 } 599 600 return nil 601 }, 602} 603var debugFeedViewCmd = &cli.Command{ 604 Name: "view-feed", 605 Usage: "<at-uri>", 606 Action: func(cctx *cli.Context) error { 607 xrpcc, err := cliutil.GetXrpcClient(cctx, true) 608 if err != nil { 609 return err 610 } 611 612 didr := cliutil.GetDidResolver(cctx) 613 614 uri := cctx.Args().First() 615 puri, err := util.ParseAtUri(uri) 616 if err != nil { 617 return err 618 } 619 620 ctx := context.TODO() 621 622 out, err := atproto.RepoGetRecord(ctx, xrpcc, "", puri.Collection, puri.Did, puri.Rkey) 623 if err != nil { 624 return fmt.Errorf("getting record: %w", err) 625 } 626 627 fgr, ok := out.Value.Val.(*bsky.FeedGenerator) 628 if !ok { 629 return fmt.Errorf("invalid feedgen record") 630 } 631 632 doc, err := didr.GetDocument(ctx, fgr.Did) 633 if err != nil { 634 return err 635 } 636 637 var ss *did.Service 638 for _, s := range doc.Service { 639 if s.ID.String() == "#bsky_fg" { 640 cp := s 641 ss = &cp 642 break 643 } 644 } 645 646 if ss == nil { 647 return fmt.Errorf("No '#bsky_fg' service entry found in feedgens DID document") 648 } 649 650 fgclient := &xrpc.Client{ 651 Host: ss.ServiceEndpoint, 652 } 653 654 cache, err := loadCache("postcache.json") 655 if err != nil { 656 return err 657 } 658 var cacheUpdate bool 659 660 var cursor string 661 getPage := func(curs string) ([]*bsky.FeedDefs_PostView, error) { 662 skel, err := bsky.FeedGetFeedSkeleton(ctx, fgclient, cursor, uri, 30) 663 if err != nil { 664 return nil, fmt.Errorf("failed to fetch feed skeleton: %w", err) 665 } 666 667 if skel.Cursor != nil { 668 cursor = *skel.Cursor 669 } 670 671 var posts []*bsky.FeedDefs_PostView 672 for _, fp := range skel.Feed { 673 cached, ok := cache[fp.Post] 674 if ok { 675 posts = append(posts, cached) 676 continue 677 } 678 fps, err := bsky.FeedGetPosts(ctx, xrpcc, []string{fp.Post}) 679 if err != nil { 680 return nil, err 681 } 682 683 if len(fps.Posts) == 0 { 684 fmt.Println("FAILED TO GET POST: ", fp.Post) 685 continue 686 } 687 p := fps.Posts[0] 688 rec := p.Record.Val.(*bsky.FeedPost) 689 rec.Embed = nil // nil out embeds since they sometimes fail to json marshal... 690 posts = append(posts, p) 691 cache[fp.Post] = p 692 cacheUpdate = true 693 } 694 695 return posts, nil 696 } 697 698 printPosts := func(posts []*bsky.FeedDefs_PostView) { 699 for _, p := range posts { 700 fp, ok := p.Record.Val.(*bsky.FeedPost) 701 if !ok { 702 fmt.Printf("ERROR: Post had invalid record type: %T\n", p.Record.Val) 703 continue 704 } 705 text := fp.Text 706 text = strings.Replace(text, "\n", " ", -1) 707 if len(text) > 70 { 708 text = text[:70] + "..." 709 } 710 711 dn := p.Author.Handle 712 if p.Author.DisplayName != nil { 713 dn = *p.Author.DisplayName 714 } 715 716 fmt.Printf("%s: %s\n", dn, text) 717 } 718 } 719 720 seen := make(map[string]bool) 721 for i := 1; i < 5; i++ { 722 fmt.Printf("PAGE %d - cursor: %s\n", i, cursor) 723 posts, err := getPage(cursor) 724 if err != nil { 725 return err 726 } 727 var alreadySeen int 728 for _, p := range posts { 729 if seen[p.Uri] { 730 alreadySeen++ 731 } 732 seen[p.Uri] = true 733 } 734 fmt.Printf("Already saw %d / %d posts in page 1\n", alreadySeen, len(posts)) 735 printPosts(posts) 736 fmt.Println("") 737 fmt.Println("") 738 } 739 740 if cacheUpdate { 741 if err := saveCache("postcache.json", cache); err != nil { 742 return err 743 } 744 } 745 746 return nil 747 }, 748} 749 750func loadCache(filename string) (map[string]*bsky.FeedDefs_PostView, error) { 751 var data map[string]*bsky.FeedDefs_PostView 752 753 jsonFile, err := os.Open(filename) 754 if err != nil { 755 if os.IsNotExist(err) { 756 return make(map[string]*bsky.FeedDefs_PostView), nil 757 } 758 759 return nil, fmt.Errorf("failed to open file: %w", err) 760 } 761 defer jsonFile.Close() 762 763 byteValue, err := io.ReadAll(jsonFile) 764 if err != nil { 765 return nil, fmt.Errorf("failed to read file: %w", err) 766 } 767 768 err = json.Unmarshal(byteValue, &data) 769 if err != nil { 770 return nil, fmt.Errorf("failed to unmarshal json: %w", err) 771 } 772 773 return data, nil 774} 775 776func saveCache(filename string, data map[string]*bsky.FeedDefs_PostView) error { 777 file, err := json.MarshalIndent(data, "", " ") 778 if err != nil { 779 return fmt.Errorf("failed to marshal json: %w", err) 780 } 781 782 err = os.WriteFile(filename, file, 0644) 783 if err != nil { 784 return fmt.Errorf("failed to write file: %w", err) 785 } 786 787 return nil 788} 789 790var debugGetRepoCmd = &cli.Command{ 791 Name: "get-repo", 792 Flags: []cli.Flag{}, 793 ArgsUsage: `<did>`, 794 Action: func(cctx *cli.Context) error { 795 xrpcc, err := cliutil.GetXrpcClient(cctx, false) 796 if err != nil { 797 return err 798 } 799 800 ctx := context.TODO() 801 802 repobytes, err := comatproto.SyncGetRepo(ctx, xrpcc, cctx.Args().First(), "") 803 if err != nil { 804 return fmt.Errorf("getting repo: %w", err) 805 } 806 807 rep, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repobytes)) 808 if err != nil { 809 return err 810 } 811 812 fmt.Println("Rev: ", rep.SignedCommit().Rev) 813 var count int 814 if err := rep.ForEach(ctx, "", func(k string, v cid.Cid) error { 815 rec, err := rep.Blockstore().Get(ctx, v) 816 if err != nil { 817 return fmt.Errorf("getting record %q: %w", k, err) 818 } 819 820 count++ 821 _ = rec 822 return nil 823 }); err != nil { 824 return err 825 } 826 fmt.Printf("scanned %d records\n", count) 827 828 return nil 829 }, 830} 831 832var debugCompareReposCmd = &cli.Command{ 833 Name: "compare-repos", 834 Flags: []cli.Flag{ 835 &cli.StringFlag{ 836 Name: "host-1", 837 Usage: "method, hostname, and port of PDS instance", 838 Value: "https://bsky.social", 839 }, 840 &cli.StringFlag{ 841 Name: "host-2", 842 Usage: "method, hostname, and port of PDS instance", 843 Value: "https://bsky.network", 844 }, 845 }, 846 ArgsUsage: `<did>`, 847 Action: func(cctx *cli.Context) error { 848 ctx := cctx.Context 849 did, err := syntax.ParseAtIdentifier(cctx.Args().First()) 850 if err != nil { 851 return err 852 } 853 854 wg := sync.WaitGroup{} 855 wg.Add(2) 856 857 xrpc1 := xrpc.Client{ 858 Host: cctx.String("host-1"), 859 Client: &http.Client{ 860 Timeout: 15 * time.Minute, 861 }, 862 } 863 864 if !cctx.IsSet("host-1") { 865 dir := identity.DefaultDirectory() 866 ident, err := dir.Lookup(ctx, *did) 867 if err != nil { 868 return err 869 } 870 871 xrpc1.Host = ident.PDSEndpoint() 872 } 873 874 xrpc2 := xrpc.Client{ 875 Host: cctx.String("host-2"), 876 Client: &http.Client{ 877 Timeout: 15 * time.Minute, 878 }, 879 } 880 881 var rep1 *repo.Repo 882 go func() { 883 defer wg.Done() 884 logger := log.With("host", cctx.String("host-1")) 885 repo1bytes, err := comatproto.SyncGetRepo(ctx, &xrpc1, did.String(), "") 886 if err != nil { 887 logger.Error("getting repo", "err", err) 888 os.Exit(1) 889 return 890 } 891 892 rep1, err = repo.ReadRepoFromCar(ctx, bytes.NewReader(repo1bytes)) 893 if err != nil { 894 logger.Error("reading repo", "err", err, "bytes", len(repo1bytes)) 895 os.Exit(1) 896 return 897 } 898 }() 899 900 var rep2 *repo.Repo 901 go func() { 902 defer wg.Done() 903 logger := log.With("host", cctx.String("host-2")) 904 repo2bytes, err := comatproto.SyncGetRepo(ctx, &xrpc2, did.String(), "") 905 if err != nil { 906 logger.Error("getting repo", "err", err) 907 os.Exit(1) 908 return 909 } 910 911 rep2, err = repo.ReadRepoFromCar(ctx, bytes.NewReader(repo2bytes)) 912 if err != nil { 913 logger.Error("reading repo", "err", err, "bytes", len(repo2bytes)) 914 os.Exit(1) 915 return 916 } 917 }() 918 919 wg.Wait() 920 921 cids1 := []cid.Cid{} 922 blocks1 := []blocks.Block{} 923 924 fmt.Println("Host 1 Results") 925 fmt.Println("Rev: ", rep1.SignedCommit().Rev) 926 var count int 927 if err := rep1.ForEach(ctx, "", func(k string, v cid.Cid) error { 928 cids1 = append(cids1, v) 929 rec, err := rep1.Blockstore().Get(ctx, v) 930 if err != nil { 931 return fmt.Errorf("getting record %q: %w", k, err) 932 } 933 blocks1 = append(blocks1, rec) 934 935 count++ 936 _ = rec 937 return nil 938 }); err != nil { 939 return err 940 } 941 fmt.Printf("scanned %d records\n", count) 942 943 cids2 := []cid.Cid{} 944 blocks2 := []blocks.Block{} 945 946 fmt.Println("\nHost 2 Results") 947 fmt.Println("Rev: ", rep2.SignedCommit().Rev) 948 count = 0 949 if err := rep2.ForEach(ctx, "", func(k string, v cid.Cid) error { 950 cids2 = append(cids2, v) 951 rec, err := rep2.Blockstore().Get(ctx, v) 952 if err != nil { 953 return fmt.Errorf("getting record %q: %w", k, err) 954 } 955 blocks2 = append(blocks2, rec) 956 957 count++ 958 _ = rec 959 return nil 960 }); err != nil { 961 return err 962 } 963 fmt.Printf("scanned %d records\n", count) 964 965 fmt.Println("\nComparing CIDs") 966 hasBadCid := false 967 for i, c1 := range cids1 { 968 if c1 != cids2[i] { 969 fmt.Printf("CID mismatch at index %d: %s != %s\n", i, c1, cids2[i]) 970 hasBadCid = true 971 } 972 } 973 974 if !hasBadCid { 975 fmt.Println("All CIDs match!") 976 } 977 978 fmt.Println("Comparing blocks") 979 hasBadBlock := false 980 for i, b1 := range blocks1 { 981 if !bytes.Equal(b1.RawData(), blocks2[i].RawData()) { 982 fmt.Printf("Block mismatch at index %d Host 1 Cid (%s) Host 2 Cid (%s)\n", i, b1.Cid().String(), blocks2[i].Cid().String()) 983 hasBadBlock = true 984 } 985 } 986 987 if !hasBadBlock { 988 fmt.Println("All blocks match!") 989 } 990 991 if hasBadBlock || hasBadCid { 992 return fmt.Errorf("mismatched blocks or cids") 993 } 994 995 return nil 996 }, 997}