this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Improve handling of bad records (#124)

And some other misc improvements.

authored by

Whyrusleeping and committed by
GitHub
b96408df 073d04c9

+176 -59
+1 -1
bgs/bgs.go
··· 348 348 // TODO: if the user is already in the 'slow' path, we shouldnt even bother trying to fast path this event 349 349 350 350 if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, (*cid.Cid)(evt.Prev), evt.Blocks); err != nil { 351 - log.Warnw("failed handling event", "err", err, "host", host.Host, "seq", evt.Seq) 351 + log.Warnw("failed handling event", "err", err, "host", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", evt.Prev.String(), "commit", evt.Commit.String()) 352 352 if !errors.Is(err, carstore.ErrRepoBaseMismatch) { 353 353 return fmt.Errorf("handle user event failed: %w", err) 354 354 }
+95
cmd/gosky/debug.go
··· 25 25 Description: "a set of debugging utilities for atproto", 26 26 Subcommands: []*cli.Command{ 27 27 inspectEventCmd, 28 + debugStreamCmd, 28 29 }, 29 30 } 30 31 ··· 136 137 return nil 137 138 }, 138 139 } 140 + 141 + type eventInfo struct { 142 + LastCid cid.Cid 143 + LastSeq int64 144 + } 145 + 146 + var debugStreamCmd = &cli.Command{ 147 + Name: "debug-stream", 148 + Flags: []cli.Flag{ 149 + &cli.StringFlag{ 150 + Name: "host", 151 + Required: true, 152 + }, 153 + &cli.BoolFlag{ 154 + Name: "dump-raw-blocks", 155 + }, 156 + }, 157 + Action: func(cctx *cli.Context) error { 158 + n, err := strconv.Atoi(cctx.Args().First()) 159 + if err != nil { 160 + return err 161 + } 162 + 163 + h := cctx.String("host") 164 + 165 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", h, n-1) 166 + d := websocket.DefaultDialer 167 + con, _, err := d.Dial(url, http.Header{}) 168 + if err != nil { 169 + return fmt.Errorf("dial failure: %w", err) 170 + } 171 + 172 + infos := make(map[string]*eventInfo) 173 + 174 + ctx := context.TODO() 175 + err = events.HandleRepoStream(ctx, con, &events.RepoStreamCallbacks{ 176 + RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 177 + 178 + fmt.Printf("\rChecking seq: %d ", evt.Seq) 179 + 180 + r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 181 + if err != nil { 182 + fmt.Printf("\nEvent at sequence %d had an invalid repo slice: %s\n", evt.Seq, err) 183 + return nil 184 + } else { 185 + prev, err := r.PrevCommit(ctx) 186 + if err != nil { 187 + return err 188 + } 189 + 190 + var cs, es string 191 + if prev != nil { 192 + cs = prev.String() 193 + } 194 + 195 + if evt.Prev != nil { 196 + es = evt.Prev.String() 197 + } 198 + 199 + if cs != es { 200 + fmt.Printf("\nEvent at sequence %d has mismatch between slice prev and struct prev: %s != %s\n", evt.Seq, prev, evt.Prev) 201 + } 202 + } 203 + 204 + cur, ok := infos[evt.Repo] 205 + if ok { 206 + if cur.LastCid.String() != evt.Prev.String() { 207 + fmt.Println() 208 + fmt.Printf("Event at sequence %d, repo=%s had prev=%s head=%s, but last commit we saw was %s (seq=%d)\n", evt.Seq, evt.Repo, evt.Prev.String(), evt.Commit.String(), evt.Commit.String(), cur.LastSeq) 209 + } 210 + } 211 + 212 + infos[evt.Repo] = &eventInfo{ 213 + LastCid: cid.Cid(evt.Commit), 214 + LastSeq: evt.Seq, 215 + } 216 + 217 + return nil 218 + }, 219 + RepoInfo: func(evt *comatproto.SyncSubscribeRepos_Info) error { 220 + return nil 221 + }, 222 + // TODO: all the other Repo* event types 223 + Error: func(evt *events.ErrorFrame) error { 224 + return fmt.Errorf("%s: %s", evt.Error, evt.Message) 225 + }, 226 + }) 227 + if err != nil { 228 + return err 229 + } 230 + 231 + return nil 232 + }, 233 + }
+1 -1
cmd/gosky/main.go
··· 920 920 if evt.Prev != nil && evt.Prev.Defined() { 921 921 pstr = evt.Prev.String() 922 922 } 923 - fmt.Printf("(%d) RepoAppend: %s (%s -> %s)\n", evt.Seq, evt.Repo, pstr, evt.Commit) 923 + fmt.Printf("(%d) RepoAppend: %s (%s -> %s)\n", evt.Seq, evt.Repo, pstr, evt.Commit.String()) 924 924 } 925 925 926 926 return nil
+1 -1
events/dbpersist.go
··· 38 38 39 39 type RepoOpRecord struct { 40 40 ID uint `gorm:"primarykey"` 41 - RepoEventRecordID uint 41 + RepoEventRecordID uint `gorm:"index"` 42 42 Path string 43 43 Action string 44 44 Rec *util.DbCID
+6 -3
events/repostream.go
··· 36 36 if err != nil { 37 37 e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err) 38 38 log.Error(e) 39 - return nil 39 + continue 40 40 } 41 41 42 42 if lexutil.LexLink(rc) != *op.Cid { 43 + // TODO: do we even error here? 43 44 return fmt.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid) 44 45 } 45 46 46 47 if err := cb(ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil { 47 - return err 48 + log.Errorf("event consumer callback (%s): %s", ek, err) 49 + continue 48 50 } 49 51 50 52 case repomgr.EvtKindDeleteRecord: 51 53 if err := cb(ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil { 52 - return err 54 + log.Errorf("event consumer callback (%s): %s", ek, err) 55 + continue 53 56 } 54 57 } 55 58 }
+11 -3
indexer/crawler.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "fmt" 5 6 6 7 comatproto "github.com/bluesky-social/indigo/api/atproto" 7 8 "github.com/bluesky-social/indigo/models" ··· 20 21 complete chan util.Uid 21 22 22 23 doRepoCrawl func(context.Context, *crawlWork) error 24 + 25 + concurrency int 23 26 } 24 27 25 - func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error) *CrawlDispatcher { 28 + func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurrency int) (*CrawlDispatcher, error) { 29 + if concurrency < 1 { 30 + return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency") 31 + } 32 + 26 33 return &CrawlDispatcher{ 27 34 ingest: make(chan *models.ActorInfo), 28 35 repoSync: make(chan *crawlWork), 29 36 complete: make(chan util.Uid), 30 37 catchup: make(chan *catchupJob), 31 38 doRepoCrawl: repoFn, 32 - } 39 + concurrency: concurrency, 40 + }, nil 33 41 } 34 42 35 43 func (c *CrawlDispatcher) Run() { 36 44 go c.mainLoop() 37 45 38 - for i := 0; i < 3; i++ { 46 + for i := 0; i < c.concurrency; i++ { 39 47 go c.fetchWorker() 40 48 } 41 49 }
+43 -32
indexer/indexer.go
··· 68 68 } 69 69 70 70 if crawl { 71 - ix.Crawler = NewCrawlDispatcher(ix.FetchAndIndexRepo) 71 + c, err := NewCrawlDispatcher(ix.FetchAndIndexRepo, 10) 72 + if err != nil { 73 + return nil, err 74 + } 72 75 76 + ix.Crawler = c 73 77 ix.Crawler.Run() 74 78 } 75 79 ··· 91 95 Cid: link, 92 96 }) 93 97 94 - switch op.Kind { 95 - case repomgr.EvtKindCreateRecord: 96 - if err := ix.crawlRecordReferences(ctx, &op); err != nil { 97 - return err 98 - } 99 - 100 - if ix.doAggregations { 101 - _, err := ix.handleRecordCreate(ctx, evt, &op, true) 102 - if err != nil { 103 - return fmt.Errorf("handle recordCreate: %w", err) 104 - } 105 - } 106 - case repomgr.EvtKindInitActor: 107 - if err := ix.handleInitActor(ctx, evt, &op); err != nil { 108 - return fmt.Errorf("handle initActor: %w", err) 109 - } 110 - case repomgr.EvtKindDeleteRecord: 111 - if ix.doAggregations { 112 - if err := ix.handleRecordDelete(ctx, evt, &op, true); err != nil { 113 - return fmt.Errorf("handle recordDelete: %w", err) 114 - } 115 - } 116 - case repomgr.EvtKindUpdateRecord: 117 - if ix.doAggregations { 118 - if err := ix.handleRecordUpdate(ctx, evt, &op, true); err != nil { 119 - return fmt.Errorf("handle recordCreate: %w", err) 120 - } 121 - } 122 - default: 123 - return fmt.Errorf("unrecognized repo event type: %q", op.Kind) 98 + if err := ix.handleRepoOp(ctx, evt, &op); err != nil { 99 + log.Errorw("failed to handle repo op", "err", err) 124 100 } 125 101 } 126 102 ··· 151 127 PrivUid: evt.User, 152 128 }); err != nil { 153 129 return fmt.Errorf("failed to push event: %s", err) 130 + } 131 + 132 + return nil 133 + } 134 + 135 + func (ix *Indexer) handleRepoOp(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error { 136 + switch op.Kind { 137 + case repomgr.EvtKindCreateRecord: 138 + if err := ix.crawlRecordReferences(ctx, op); err != nil { 139 + return err 140 + } 141 + 142 + if ix.doAggregations { 143 + _, err := ix.handleRecordCreate(ctx, evt, op, true) 144 + if err != nil { 145 + return fmt.Errorf("handle recordCreate: %w", err) 146 + } 147 + } 148 + case repomgr.EvtKindInitActor: 149 + if err := ix.handleInitActor(ctx, evt, op); err != nil { 150 + return fmt.Errorf("handle initActor: %w", err) 151 + } 152 + case repomgr.EvtKindDeleteRecord: 153 + if ix.doAggregations { 154 + if err := ix.handleRecordDelete(ctx, evt, op, true); err != nil { 155 + return fmt.Errorf("handle recordDelete: %w", err) 156 + } 157 + } 158 + case repomgr.EvtKindUpdateRecord: 159 + if ix.doAggregations { 160 + if err := ix.handleRecordUpdate(ctx, evt, op, true); err != nil { 161 + return fmt.Errorf("handle recordCreate: %w", err) 162 + } 163 + } 164 + default: 165 + return fmt.Errorf("unrecognized repo event type: %q", op.Kind) 154 166 } 155 167 156 168 return nil ··· 852 864 from = curHead.String() 853 865 } else { 854 866 span.SetAttributes(attribute.Bool("full", true)) 855 - 856 867 } 857 868 858 869 // TODO: max size on these? A malicious PDS could just send us a petabyte sized repo here and kill us
+16 -16
repomgr/repomgr.go
··· 511 511 ctx, span := otel.Tracer("repoman").Start(ctx, "HandleExternalUserEvent") 512 512 defer span.End() 513 513 514 - log.Infof("HandleExternalUserEvent: %d %d %s", pdsid, uid, prev) 514 + log.Infow("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "prev", prev) 515 515 516 516 unlock := rm.lockUser(ctx, uid) 517 517 defer unlock() ··· 542 542 if err := rm.kmgr.VerifyUserSignature(ctx, repoDid, scom.Sig, sb); err != nil { 543 543 return fmt.Errorf("signature check failed: %w", err) 544 544 } 545 - 546 - log.Infow("external event", "uid", uid) 547 545 548 546 var pcid cid.Cid 549 547 if prev != nil { ··· 885 883 return nil, err 886 884 } 887 885 888 - rec, err := lexutil.CborDecodeValue(blk.RawData()) 889 - if err != nil { 890 - if errors.Is(err, lexutil.ErrUnrecognizedType) { 891 - log.Warnf("failed processing repo diff: %s", err) 892 - return nil, nil 893 - } 894 - 895 - return nil, err 896 - } 897 - 898 886 kind := EvtKindCreateRecord 899 887 if op.Op == "mut" { 900 888 kind = EvtKindUpdateRecord 901 889 } 902 890 903 - return &RepoOp{ 891 + outop := &RepoOp{ 904 892 Kind: kind, 905 893 Collection: parts[0], 906 894 Rkey: parts[1], 907 895 RecCid: &op.NewCid, 908 - Record: rec, 909 - }, nil 896 + } 897 + 898 + rec, err := lexutil.CborDecodeValue(blk.RawData()) 899 + if err != nil { 900 + if !errors.Is(err, lexutil.ErrUnrecognizedType) { 901 + return nil, err 902 + } 903 + 904 + log.Warnf("failed processing repo diff: %s", err) 905 + } else { 906 + outop.Record = rec 907 + } 908 + 909 + return outop, nil 910 910 case "del": 911 911 return &RepoOp{ 912 912 Kind: EvtKindDeleteRecord,
+2 -2
xrpc/xrpc.go
··· 125 125 return fmt.Errorf("reading response body: %w", err) 126 126 } 127 127 } else { 128 - _, err := io.CopyN(buf, resp.Body, resp.ContentLength) 128 + n, err := io.CopyN(buf, resp.Body, resp.ContentLength) 129 129 if err != nil { 130 - return fmt.Errorf("reading length delimited response body: %w", err) 130 + return fmt.Errorf("reading length delimited response body (%d < %d): %w", n, resp.ContentLength, err) 131 131 } 132 132 } 133 133 } else {