this repo has no description
1package main
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "net/http"
10 "os"
11 "os/signal"
12 "strconv"
13 "strings"
14 "sync"
15 "syscall"
16 "time"
17
18 "github.com/bluesky-social/indigo/api/atproto"
19 comatproto "github.com/bluesky-social/indigo/api/atproto"
20 "github.com/bluesky-social/indigo/api/bsky"
21 "github.com/bluesky-social/indigo/atproto/identity"
22 "github.com/bluesky-social/indigo/atproto/syntax"
23 "github.com/bluesky-social/indigo/did"
24 "github.com/bluesky-social/indigo/events"
25 "github.com/bluesky-social/indigo/events/schedulers/sequential"
26 lexutil "github.com/bluesky-social/indigo/lex/util"
27 "github.com/bluesky-social/indigo/repo"
28 "github.com/bluesky-social/indigo/repomgr"
29 "github.com/bluesky-social/indigo/util"
30 "github.com/bluesky-social/indigo/util/cliutil"
31 "github.com/bluesky-social/indigo/xrpc"
32
33 "github.com/gorilla/websocket"
34 "github.com/ipfs/go-cid"
35 "github.com/ipfs/go-libipfs/blocks"
36 "github.com/ipld/go-car/v2"
37 cli "github.com/urfave/cli/v2"
38)
39
40var debugCmd = &cli.Command{
41 Name: "debug",
42 Usage: "a set of debugging utilities for atproto",
43 Subcommands: []*cli.Command{
44 inspectEventCmd,
45 debugStreamCmd,
46 debugFeedGenCmd,
47 debugFeedViewCmd,
48 compareStreamsCmd,
49 debugGetRepoCmd,
50 debugCompareReposCmd,
51 },
52}
53
54var inspectEventCmd = &cli.Command{
55 Name: "inspect-event",
56 Flags: []cli.Flag{
57 &cli.StringFlag{
58 Name: "host",
59 Required: true,
60 },
61 &cli.BoolFlag{
62 Name: "dump-raw-blocks",
63 },
64 },
65 ArgsUsage: `<cursor>`,
66 Action: func(cctx *cli.Context) error {
67 n, err := strconv.Atoi(cctx.Args().First())
68 if err != nil {
69 return err
70 }
71
72 h := cctx.String("host")
73
74 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", h, n-1)
75 d := websocket.DefaultDialer
76 con, _, err := d.Dial(url, http.Header{})
77 if err != nil {
78 return fmt.Errorf("dial failure: %w", err)
79 }
80
81 var errFoundIt = fmt.Errorf("gotem")
82
83 var match *comatproto.SyncSubscribeRepos_Commit
84
85 ctx := context.TODO()
86 rsc := &events.RepoStreamCallbacks{
87 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
88 n := int64(n)
89 if evt.Seq == n {
90 match = evt
91 return errFoundIt
92 }
93 if evt.Seq > n {
94 return fmt.Errorf("record not found in stream")
95 }
96
97 return nil
98 },
99 RepoInfo: func(evt *comatproto.SyncSubscribeRepos_Info) error {
100 return nil
101 },
102 // TODO: all the other Repo* event types
103 Error: func(evt *events.ErrorFrame) error {
104 return fmt.Errorf("%s: %s", evt.Error, evt.Message)
105 },
106 }
107
108 seqScheduler := sequential.NewScheduler("debug-inspect-event", rsc.EventHandler)
109 err = events.HandleRepoStream(ctx, con, seqScheduler, nil)
110 if err != errFoundIt {
111 return err
112 }
113
114 b, err := json.MarshalIndent(match, "", " ")
115 if err != nil {
116 return err
117 }
118 fmt.Println(string(b))
119
120 br, err := car.NewBlockReader(bytes.NewReader(match.Blocks))
121 if err != nil {
122 return err
123 }
124
125 fmt.Println("\nSlice Dump:")
126 fmt.Println("Root: ", br.Roots[0])
127 for {
128 blk, err := br.Next()
129 if err != nil {
130 if err == io.EOF {
131 break
132 }
133 return err
134 }
135
136 fmt.Println(blk.Cid())
137 if cctx.Bool("dump-raw-blocks") {
138 fmt.Printf("%x\n", blk.RawData())
139 }
140 }
141
142 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(match.Blocks))
143 if err != nil {
144 return fmt.Errorf("opening repo from slice: %w", err)
145 }
146
147 fmt.Println("\nOps: ")
148 for _, op := range match.Ops {
149 switch repomgr.EventKind(op.Action) {
150 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord:
151 rcid, _, err := r.GetRecord(ctx, op.Path)
152 if err != nil {
153 return fmt.Errorf("loading %q: %w", op.Path, err)
154 }
155 if rcid != cid.Cid(*op.Cid) {
156 return fmt.Errorf("mismatch in record cid %s != %s", rcid, *op.Cid)
157 }
158 fmt.Printf("%s (%s): %s\n", op.Action, op.Path, *op.Cid)
159 }
160 }
161
162 return nil
163 },
164}
165
166type eventInfo struct {
167 LastSeq int64
168 LastRev string
169}
170
171func cidStr(c *lexutil.LexLink) string {
172 if c == nil {
173 return "<nil>"
174 }
175
176 return c.String()
177}
178
179var debugStreamCmd = &cli.Command{
180 Name: "debug-stream",
181 Flags: []cli.Flag{
182 &cli.StringFlag{
183 Name: "host",
184 Required: true,
185 },
186 &cli.BoolFlag{
187 Name: "dump-raw-blocks",
188 },
189 },
190 ArgsUsage: `<cursor>`,
191 Action: func(cctx *cli.Context) error {
192 n, err := strconv.Atoi(cctx.Args().First())
193 if err != nil {
194 return err
195 }
196
197 h := cctx.String("host")
198
199 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", h, n)
200 d := websocket.DefaultDialer
201 con, _, err := d.Dial(url, http.Header{})
202 if err != nil {
203 return fmt.Errorf("dial failure: %w", err)
204 }
205
206 infos := make(map[string]*eventInfo)
207
208 var lastSeq int64 = -1
209 ctx := context.TODO()
210 rsc := &events.RepoStreamCallbacks{
211 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
212
213 fmt.Printf("\rChecking seq: %d ", evt.Seq)
214 if lastSeq > 0 && evt.Seq != lastSeq+1 {
215 fmt.Println("Gap in sequence numbers: ", lastSeq, evt.Seq)
216 }
217 lastSeq = evt.Seq
218
219 if !evt.TooBig {
220 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
221 if err != nil {
222 fmt.Printf("\nEvent at sequence %d had an invalid repo slice: %s\n", evt.Seq, err)
223 return nil
224 } else {
225 _ = r
226 /* "prev" is no longer included in #commit messages
227 prev, err := r.PrevCommit(ctx)
228 if err != nil {
229 return err
230 }
231
232 var cs, es string
233 if prev != nil {
234 cs = prev.String()
235 }
236
237 if evt.Prev != nil {
238 es = evt.Prev.String()
239 }
240
241 if !evt.Rebase && cs != es {
242 fmt.Printf("\nEvent at sequence %d has mismatch between slice prev and struct prev: %s != %s\n", evt.Seq, prev, evt.Prev)
243 }
244 */
245 }
246 }
247
248 cur, ok := infos[evt.Repo]
249 if ok {
250 if evt.Since != nil && cur.LastRev != *evt.Since {
251 /*
252 fmt.Println()
253 fmt.Printf("Event at sequence %d, repo=%s had since=%s, but last rev we saw was %s (seq=%d)\n", evt.Seq, evt.Repo, evt.Since, cur.LastRev, cur.LastSeq)
254 */
255 }
256 }
257
258 infos[evt.Repo] = &eventInfo{
259 LastSeq: evt.Seq,
260 LastRev: evt.Rev,
261 }
262
263 return nil
264 },
265 RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
266 fmt.Printf("\rChecking seq: %d ", evt.Seq)
267 if lastSeq > 0 && evt.Seq != lastSeq+1 {
268 fmt.Println("Gap in sequence numbers: ", lastSeq, evt.Seq)
269 }
270 lastSeq = evt.Seq
271 return nil
272 },
273 RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error {
274 fmt.Printf("\rChecking seq: %d ", evt.Seq)
275 if lastSeq > 0 && evt.Seq != lastSeq+1 {
276 fmt.Println("Gap in sequence numbers: ", lastSeq, evt.Seq)
277 }
278 lastSeq = evt.Seq
279 return nil
280 },
281 RepoInfo: func(evt *comatproto.SyncSubscribeRepos_Info) error {
282 return nil
283 },
284 // TODO: all the other Repo* event types
285 Error: func(evt *events.ErrorFrame) error {
286 return fmt.Errorf("%s: %s", evt.Error, evt.Message)
287 },
288 }
289 seqScheduler := sequential.NewScheduler("debug-stream", rsc.EventHandler)
290 err = events.HandleRepoStream(ctx, con, seqScheduler, nil)
291 if err != nil {
292 return err
293 }
294
295 return nil
296 },
297}
298
299var compareStreamsCmd = &cli.Command{
300 Name: "compare-streams",
301 Flags: []cli.Flag{
302 &cli.StringFlag{
303 Name: "host1",
304 Required: true,
305 },
306 &cli.StringFlag{
307 Name: "host2",
308 Required: true,
309 },
310 },
311 ArgsUsage: `<cursor>`,
312 Action: func(cctx *cli.Context) error {
313 h1 := cctx.String("host1")
314 h2 := cctx.String("host2")
315
316 url1 := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", h1)
317 url2 := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", h2)
318
319 d := websocket.DefaultDialer
320
321 eventChans := []chan *comatproto.SyncSubscribeRepos_Commit{
322 make(chan *comatproto.SyncSubscribeRepos_Commit, 2),
323 make(chan *comatproto.SyncSubscribeRepos_Commit, 2),
324 }
325
326 buffers := []map[string][]*comatproto.SyncSubscribeRepos_Commit{
327 make(map[string][]*comatproto.SyncSubscribeRepos_Commit),
328 make(map[string][]*comatproto.SyncSubscribeRepos_Commit),
329 }
330
331 addToBuffer := func(n int, event *comatproto.SyncSubscribeRepos_Commit) {
332 buffers[n][event.Repo] = append(buffers[n][event.Repo], event)
333 }
334
335 pll := func(ll *lexutil.LexLink) string {
336 if ll == nil {
337 return "<nil>"
338 }
339 return ll.String()
340 }
341
342 findMatchAndRemove := func(n int, event *comatproto.SyncSubscribeRepos_Commit) (*comatproto.SyncSubscribeRepos_Commit, error) {
343 buf := buffers[n]
344 slice, ok := buf[event.Repo]
345 if !ok || len(slice) == 0 {
346 return nil, nil
347 }
348
349 for i, ev := range slice {
350 if ev.Commit == event.Commit {
351 _ = pll
352 /* TODO: prev is no longer included in #commit messages; could use prevData or rev?
353 if pll(ev.Prev) != pll(event.Prev) {
354 // same commit different prev??
355 return nil, fmt.Errorf("matched event with same commit but different prev: (%d) %d - %d", n, ev.Seq, event.Seq)
356 }
357 */
358 }
359
360 if i != 0 {
361 fmt.Printf("detected skipped event: %d (%d)\n", slice[0].Seq, i)
362 }
363
364 slice = slice[i+1:]
365 buf[event.Repo] = slice
366 return ev, nil
367 }
368
369 return nil, fmt.Errorf("did not find matching event despite having events in buffer")
370 }
371
372 printCurrentDelta := func() {
373 var a, b int
374 for _, sl := range buffers[0] {
375 a += len(sl)
376 }
377 for _, sl := range buffers[1] {
378 b += len(sl)
379 }
380
381 fmt.Printf("%d %d\n", a, b)
382 }
383
384 printDetailedDelta := func() {
385 for did, sl := range buffers[0] {
386 osl := buffers[1][did]
387 if len(osl) > 0 && len(sl) > 0 {
388 fmt.Printf("%s had mismatched events on both streams (%d, %d)\n", did, len(sl), len(osl))
389 }
390
391 }
392 }
393
394 // Create two goroutines for reading events from two URLs
395 for i, url := range []string{url1, url2} {
396 go func(i int, url string) {
397 con, _, err := d.Dial(url, http.Header{})
398 if err != nil {
399 log.Error("Dial failure", "i", i, "url", url, "err", err)
400 os.Exit(1)
401 }
402
403 ctx := context.TODO()
404 rsc := &events.RepoStreamCallbacks{
405 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
406 eventChans[i] <- evt
407 return nil
408 },
409 // TODO: all the other Repo* event types
410 Error: func(evt *events.ErrorFrame) error {
411 return fmt.Errorf("%s: %s", evt.Error, evt.Message)
412 },
413 }
414 seqScheduler := sequential.NewScheduler(fmt.Sprintf("debug-stream-%d", i+1), rsc.EventHandler)
415 if err := events.HandleRepoStream(ctx, con, seqScheduler, nil); err != nil {
416 log.Error("HandleRepoStream failure", "i", i, "url", url, "err", err)
417 os.Exit(1)
418 }
419 }(i, url)
420 }
421
422 ch := make(chan os.Signal, 1)
423 signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT)
424
425 // Compare events from the two URLs
426 for {
427 select {
428 case event := <-eventChans[0]:
429 partner, err := findMatchAndRemove(1, event)
430 if err != nil {
431 fmt.Println("checking for match failed: ", err)
432 continue
433 }
434 if partner == nil {
435 addToBuffer(0, event)
436 } else {
437 // the good case
438 fmt.Println("Match found")
439 }
440
441 case event := <-eventChans[1]:
442 partner, err := findMatchAndRemove(0, event)
443 if err != nil {
444 fmt.Println("checking for match failed: ", err)
445 continue
446 }
447 if partner == nil {
448 addToBuffer(1, event)
449 } else {
450 // the good case
451 fmt.Println("Match found")
452 }
453 case <-ch:
454 printDetailedDelta()
455 /*
456 b, err := json.Marshal(buffers)
457 if err != nil {
458 return err
459 }
460
461 fmt.Println(string(b))
462 */
463 return nil
464 }
465
466 printCurrentDelta()
467 }
468 },
469}
470
471var debugFeedGenCmd = &cli.Command{
472 Name: "debug-feed",
473 ArgsUsage: "<at-uri>",
474 Action: func(cctx *cli.Context) error {
475 xrpcc, err := cliutil.GetXrpcClient(cctx, true)
476 if err != nil {
477 return err
478 }
479
480 didr := cliutil.GetDidResolver(cctx)
481
482 uri := cctx.Args().First()
483 puri, err := util.ParseAtUri(uri)
484 if err != nil {
485 return err
486 }
487
488 ctx := context.TODO()
489
490 out, err := atproto.RepoGetRecord(ctx, xrpcc, "", puri.Collection, puri.Did, puri.Rkey)
491 if err != nil {
492 return fmt.Errorf("getting record: %w", err)
493 }
494
495 fgr, ok := out.Value.Val.(*bsky.FeedGenerator)
496 if !ok {
497 return fmt.Errorf("invalid feedgen record")
498 }
499
500 fmt.Println("Feed DID is: ", fgr.Did)
501 doc, err := didr.GetDocument(ctx, fgr.Did)
502 if err != nil {
503 return err
504 }
505
506 fmt.Println("Got service did document:")
507 b, err := json.MarshalIndent(doc, "", " ")
508 if err != nil {
509 return err
510 }
511 fmt.Println(string(b))
512
513 var ss *did.Service
514 for _, s := range doc.Service {
515 if s.ID.String() == "#bsky_fg" {
516 cp := s
517 ss = &cp
518 break
519 }
520 }
521
522 if ss == nil {
523 return fmt.Errorf("No '#bsky_fg' service entry found in feedgens DID document")
524 }
525
526 fmt.Println("Service endpoint is: ", ss.ServiceEndpoint)
527
528 fgclient := &xrpc.Client{
529 Host: ss.ServiceEndpoint,
530 }
531
532 desc, err := bsky.FeedDescribeFeedGenerator(ctx, fgclient)
533 if err != nil {
534 return err
535 }
536
537 fmt.Printf("Found %d feeds at discovered endpoint\n", len(desc.Feeds))
538 var found bool
539 for _, f := range desc.Feeds {
540 fmt.Println("Feed: ", f.Uri)
541 if f.Uri == uri {
542 found = true
543 break
544 }
545 }
546
547 if !found {
548 return fmt.Errorf("specified feed was not present in linked feedGenerators 'describe' method output")
549 }
550
551 skel, err := bsky.FeedGetFeedSkeleton(ctx, fgclient, "", uri, 30)
552 if err != nil {
553 return fmt.Errorf("failed to fetch feed skeleton: %w", err)
554 }
555
556 if len(skel.Feed) > 30 {
557 return fmt.Errorf("feedgen not respecting limit param (returned %d posts)", len(skel.Feed))
558 }
559
560 if len(skel.Feed) == 0 {
561 return fmt.Errorf("feedgen response is empty (might be expected since we aren't authed)")
562 }
563
564 fmt.Println("Feed response looks good!")
565
566 seen := make(map[string]bool)
567 for _, p := range skel.Feed {
568 seen[p.Post] = true
569 }
570
571 curs := skel.Cursor
572 for i := 0; i < 10 && curs != nil; i++ {
573 fmt.Println("Response had cursor: ", *curs)
574 nresp, err := bsky.FeedGetFeedSkeleton(ctx, fgclient, *curs, uri, 10)
575 if err != nil {
576 return fmt.Errorf("fetching paginated feed failed: %w", err)
577 }
578
579 fmt.Printf("Got %d posts from cursored query\n", len(nresp.Feed))
580
581 if len(nresp.Feed) > 10 {
582 return fmt.Errorf("got more posts than we requested")
583 }
584
585 for _, p := range nresp.Feed {
586 if seen[p.Post] {
587 return fmt.Errorf("duplicate post in response: %s", p.Post)
588 }
589
590 seen[p.Post] = true
591 }
592
593 if len(nresp.Feed) == 0 || nresp.Cursor == nil {
594 break
595 }
596
597 curs = nresp.Cursor
598 }
599
600 return nil
601 },
602}
603var debugFeedViewCmd = &cli.Command{
604 Name: "view-feed",
605 Usage: "<at-uri>",
606 Action: func(cctx *cli.Context) error {
607 xrpcc, err := cliutil.GetXrpcClient(cctx, true)
608 if err != nil {
609 return err
610 }
611
612 didr := cliutil.GetDidResolver(cctx)
613
614 uri := cctx.Args().First()
615 puri, err := util.ParseAtUri(uri)
616 if err != nil {
617 return err
618 }
619
620 ctx := context.TODO()
621
622 out, err := atproto.RepoGetRecord(ctx, xrpcc, "", puri.Collection, puri.Did, puri.Rkey)
623 if err != nil {
624 return fmt.Errorf("getting record: %w", err)
625 }
626
627 fgr, ok := out.Value.Val.(*bsky.FeedGenerator)
628 if !ok {
629 return fmt.Errorf("invalid feedgen record")
630 }
631
632 doc, err := didr.GetDocument(ctx, fgr.Did)
633 if err != nil {
634 return err
635 }
636
637 var ss *did.Service
638 for _, s := range doc.Service {
639 if s.ID.String() == "#bsky_fg" {
640 cp := s
641 ss = &cp
642 break
643 }
644 }
645
646 if ss == nil {
647 return fmt.Errorf("No '#bsky_fg' service entry found in feedgens DID document")
648 }
649
650 fgclient := &xrpc.Client{
651 Host: ss.ServiceEndpoint,
652 }
653
654 cache, err := loadCache("postcache.json")
655 if err != nil {
656 return err
657 }
658 var cacheUpdate bool
659
660 var cursor string
661 getPage := func(curs string) ([]*bsky.FeedDefs_PostView, error) {
662 skel, err := bsky.FeedGetFeedSkeleton(ctx, fgclient, cursor, uri, 30)
663 if err != nil {
664 return nil, fmt.Errorf("failed to fetch feed skeleton: %w", err)
665 }
666
667 if skel.Cursor != nil {
668 cursor = *skel.Cursor
669 }
670
671 var posts []*bsky.FeedDefs_PostView
672 for _, fp := range skel.Feed {
673 cached, ok := cache[fp.Post]
674 if ok {
675 posts = append(posts, cached)
676 continue
677 }
678 fps, err := bsky.FeedGetPosts(ctx, xrpcc, []string{fp.Post})
679 if err != nil {
680 return nil, err
681 }
682
683 if len(fps.Posts) == 0 {
684 fmt.Println("FAILED TO GET POST: ", fp.Post)
685 continue
686 }
687 p := fps.Posts[0]
688 rec := p.Record.Val.(*bsky.FeedPost)
689 rec.Embed = nil // nil out embeds since they sometimes fail to json marshal...
690 posts = append(posts, p)
691 cache[fp.Post] = p
692 cacheUpdate = true
693 }
694
695 return posts, nil
696 }
697
698 printPosts := func(posts []*bsky.FeedDefs_PostView) {
699 for _, p := range posts {
700 fp, ok := p.Record.Val.(*bsky.FeedPost)
701 if !ok {
702 fmt.Printf("ERROR: Post had invalid record type: %T\n", p.Record.Val)
703 continue
704 }
705 text := fp.Text
706 text = strings.Replace(text, "\n", " ", -1)
707 if len(text) > 70 {
708 text = text[:70] + "..."
709 }
710
711 dn := p.Author.Handle
712 if p.Author.DisplayName != nil {
713 dn = *p.Author.DisplayName
714 }
715
716 fmt.Printf("%s: %s\n", dn, text)
717 }
718 }
719
720 seen := make(map[string]bool)
721 for i := 1; i < 5; i++ {
722 fmt.Printf("PAGE %d - cursor: %s\n", i, cursor)
723 posts, err := getPage(cursor)
724 if err != nil {
725 return err
726 }
727 var alreadySeen int
728 for _, p := range posts {
729 if seen[p.Uri] {
730 alreadySeen++
731 }
732 seen[p.Uri] = true
733 }
734 fmt.Printf("Already saw %d / %d posts in page 1\n", alreadySeen, len(posts))
735 printPosts(posts)
736 fmt.Println("")
737 fmt.Println("")
738 }
739
740 if cacheUpdate {
741 if err := saveCache("postcache.json", cache); err != nil {
742 return err
743 }
744 }
745
746 return nil
747 },
748}
749
750func loadCache(filename string) (map[string]*bsky.FeedDefs_PostView, error) {
751 var data map[string]*bsky.FeedDefs_PostView
752
753 jsonFile, err := os.Open(filename)
754 if err != nil {
755 if os.IsNotExist(err) {
756 return make(map[string]*bsky.FeedDefs_PostView), nil
757 }
758
759 return nil, fmt.Errorf("failed to open file: %w", err)
760 }
761 defer jsonFile.Close()
762
763 byteValue, err := io.ReadAll(jsonFile)
764 if err != nil {
765 return nil, fmt.Errorf("failed to read file: %w", err)
766 }
767
768 err = json.Unmarshal(byteValue, &data)
769 if err != nil {
770 return nil, fmt.Errorf("failed to unmarshal json: %w", err)
771 }
772
773 return data, nil
774}
775
776func saveCache(filename string, data map[string]*bsky.FeedDefs_PostView) error {
777 file, err := json.MarshalIndent(data, "", " ")
778 if err != nil {
779 return fmt.Errorf("failed to marshal json: %w", err)
780 }
781
782 err = os.WriteFile(filename, file, 0644)
783 if err != nil {
784 return fmt.Errorf("failed to write file: %w", err)
785 }
786
787 return nil
788}
789
790var debugGetRepoCmd = &cli.Command{
791 Name: "get-repo",
792 Flags: []cli.Flag{},
793 ArgsUsage: `<did>`,
794 Action: func(cctx *cli.Context) error {
795 xrpcc, err := cliutil.GetXrpcClient(cctx, false)
796 if err != nil {
797 return err
798 }
799
800 ctx := context.TODO()
801
802 repobytes, err := comatproto.SyncGetRepo(ctx, xrpcc, cctx.Args().First(), "")
803 if err != nil {
804 return fmt.Errorf("getting repo: %w", err)
805 }
806
807 rep, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repobytes))
808 if err != nil {
809 return err
810 }
811
812 fmt.Println("Rev: ", rep.SignedCommit().Rev)
813 var count int
814 if err := rep.ForEach(ctx, "", func(k string, v cid.Cid) error {
815 rec, err := rep.Blockstore().Get(ctx, v)
816 if err != nil {
817 return fmt.Errorf("getting record %q: %w", k, err)
818 }
819
820 count++
821 _ = rec
822 return nil
823 }); err != nil {
824 return err
825 }
826 fmt.Printf("scanned %d records\n", count)
827
828 return nil
829 },
830}
831
832var debugCompareReposCmd = &cli.Command{
833 Name: "compare-repos",
834 Flags: []cli.Flag{
835 &cli.StringFlag{
836 Name: "host-1",
837 Usage: "method, hostname, and port of PDS instance",
838 Value: "https://bsky.social",
839 },
840 &cli.StringFlag{
841 Name: "host-2",
842 Usage: "method, hostname, and port of PDS instance",
843 Value: "https://bsky.network",
844 },
845 },
846 ArgsUsage: `<did>`,
847 Action: func(cctx *cli.Context) error {
848 ctx := cctx.Context
849 did, err := syntax.ParseAtIdentifier(cctx.Args().First())
850 if err != nil {
851 return err
852 }
853
854 wg := sync.WaitGroup{}
855 wg.Add(2)
856
857 xrpc1 := xrpc.Client{
858 Host: cctx.String("host-1"),
859 Client: &http.Client{
860 Timeout: 15 * time.Minute,
861 },
862 }
863
864 if !cctx.IsSet("host-1") {
865 dir := identity.DefaultDirectory()
866 ident, err := dir.Lookup(ctx, *did)
867 if err != nil {
868 return err
869 }
870
871 xrpc1.Host = ident.PDSEndpoint()
872 }
873
874 xrpc2 := xrpc.Client{
875 Host: cctx.String("host-2"),
876 Client: &http.Client{
877 Timeout: 15 * time.Minute,
878 },
879 }
880
881 var rep1 *repo.Repo
882 go func() {
883 defer wg.Done()
884 logger := log.With("host", cctx.String("host-1"))
885 repo1bytes, err := comatproto.SyncGetRepo(ctx, &xrpc1, did.String(), "")
886 if err != nil {
887 logger.Error("getting repo", "err", err)
888 os.Exit(1)
889 return
890 }
891
892 rep1, err = repo.ReadRepoFromCar(ctx, bytes.NewReader(repo1bytes))
893 if err != nil {
894 logger.Error("reading repo", "err", err, "bytes", len(repo1bytes))
895 os.Exit(1)
896 return
897 }
898 }()
899
900 var rep2 *repo.Repo
901 go func() {
902 defer wg.Done()
903 logger := log.With("host", cctx.String("host-2"))
904 repo2bytes, err := comatproto.SyncGetRepo(ctx, &xrpc2, did.String(), "")
905 if err != nil {
906 logger.Error("getting repo", "err", err)
907 os.Exit(1)
908 return
909 }
910
911 rep2, err = repo.ReadRepoFromCar(ctx, bytes.NewReader(repo2bytes))
912 if err != nil {
913 logger.Error("reading repo", "err", err, "bytes", len(repo2bytes))
914 os.Exit(1)
915 return
916 }
917 }()
918
919 wg.Wait()
920
921 cids1 := []cid.Cid{}
922 blocks1 := []blocks.Block{}
923
924 fmt.Println("Host 1 Results")
925 fmt.Println("Rev: ", rep1.SignedCommit().Rev)
926 var count int
927 if err := rep1.ForEach(ctx, "", func(k string, v cid.Cid) error {
928 cids1 = append(cids1, v)
929 rec, err := rep1.Blockstore().Get(ctx, v)
930 if err != nil {
931 return fmt.Errorf("getting record %q: %w", k, err)
932 }
933 blocks1 = append(blocks1, rec)
934
935 count++
936 _ = rec
937 return nil
938 }); err != nil {
939 return err
940 }
941 fmt.Printf("scanned %d records\n", count)
942
943 cids2 := []cid.Cid{}
944 blocks2 := []blocks.Block{}
945
946 fmt.Println("\nHost 2 Results")
947 fmt.Println("Rev: ", rep2.SignedCommit().Rev)
948 count = 0
949 if err := rep2.ForEach(ctx, "", func(k string, v cid.Cid) error {
950 cids2 = append(cids2, v)
951 rec, err := rep2.Blockstore().Get(ctx, v)
952 if err != nil {
953 return fmt.Errorf("getting record %q: %w", k, err)
954 }
955 blocks2 = append(blocks2, rec)
956
957 count++
958 _ = rec
959 return nil
960 }); err != nil {
961 return err
962 }
963 fmt.Printf("scanned %d records\n", count)
964
965 fmt.Println("\nComparing CIDs")
966 hasBadCid := false
967 for i, c1 := range cids1 {
968 if c1 != cids2[i] {
969 fmt.Printf("CID mismatch at index %d: %s != %s\n", i, c1, cids2[i])
970 hasBadCid = true
971 }
972 }
973
974 if !hasBadCid {
975 fmt.Println("All CIDs match!")
976 }
977
978 fmt.Println("Comparing blocks")
979 hasBadBlock := false
980 for i, b1 := range blocks1 {
981 if !bytes.Equal(b1.RawData(), blocks2[i].RawData()) {
982 fmt.Printf("Block mismatch at index %d Host 1 Cid (%s) Host 2 Cid (%s)\n", i, b1.Cid().String(), blocks2[i].Cid().String())
983 hasBadBlock = true
984 }
985 }
986
987 if !hasBadBlock {
988 fmt.Println("All blocks match!")
989 }
990
991 if hasBadBlock || hasBadCid {
992 return fmt.Errorf("mismatched blocks or cids")
993 }
994
995 return nil
996 },
997}