this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

misc tweaks made for rerunning bgs scrape (#192)

Until we get a proper 'bootstrap sync' protocol, im adding an env var to
allow us to skip ratelimiting on the pds.

also cleaned up the handling of too-big events a bit

authored by

Whyrusleeping and committed by
GitHub
2d17f95a 03c8d951

+124 -38
+6 -2
bgs/admin.go
··· 48 48 } 49 49 } 50 50 51 - if err := bgs.slurper.KillUpstreamConnection(host); err != nil { 51 + block := strings.ToLower(e.QueryParam("block")) == "true" 52 + 53 + if err := bgs.slurper.KillUpstreamConnection(host, block); err != nil { 52 54 if errors.Is(err, ErrNoActiveConnection) { 53 55 return &echo.HTTPError{ 54 56 Code: 400, ··· 58 60 return err 59 61 } 60 62 61 - return nil 63 + return e.JSON(200, map[string]any{ 64 + "success": "true", 65 + }) 62 66 }
+32 -5
bgs/bgs.go
··· 121 121 122 122 json.NewEncoder(w).Encode(out) 123 123 }) 124 + http.HandleFunc("/repodbg/crawl", func(w http.ResponseWriter, r *http.Request) { 125 + ctx := r.Context() 126 + did := r.FormValue("did") 127 + 128 + act, err := bgs.Index.GetUserOrMissing(ctx, did) 129 + if err != nil { 130 + w.WriteHeader(500) 131 + log.Errorf("failed to get user: %s", err) 132 + return 133 + } 134 + 135 + if err := bgs.Index.Crawler.Crawl(ctx, act); err != nil { 136 + w.WriteHeader(500) 137 + log.Errorf("failed to add user to crawler: %s", err) 138 + return 139 + } 140 + }) 124 141 http.HandleFunc("/repodbg/blocks", func(w http.ResponseWriter, r *http.Request) { 125 142 ctx := r.Context() 126 143 did := r.FormValue("did") ··· 154 171 155 172 w.WriteHeader(200) 156 173 w.Write(blk.RawData()) 157 - 158 174 }) 159 175 http.Handle("/prometheus", prometheusHandler()) 160 176 ··· 171 187 172 188 e.HTTPErrorHandler = func(err error, ctx echo.Context) { 173 189 log.Warnf("HANDLER ERROR: (%s) %s", ctx.Path(), err) 174 - ctx.Response().WriteHeader(500) 190 + switch err := err.(type) { 191 + case *echo.HTTPError: 192 + if err2 := ctx.JSON(err.Code, map[string]any{ 193 + "error": err.Message, 194 + }); err2 != nil { 195 + log.Errorf("Failed to write http error: %s", err2) 196 + } 197 + default: 198 + ctx.Response().WriteHeader(500) 199 + } 175 200 } 176 201 177 202 // TODO: this API is temporary until we formalize what we want here ··· 409 434 switch { 410 435 case env.RepoCommit != nil: 411 436 evt := env.RepoCommit 412 - log.Infof("bgs got repo append event %d from %q: %s\n", evt.Seq, host.Host, evt.Repo) 437 + log.Infof("bgs got repo append event %d from %q: %s", evt.Seq, host.Host, evt.Repo) 413 438 u, err := bgs.lookupUserByDid(ctx, evt.Repo) 414 439 if err != nil { 415 440 if !errors.Is(err, gorm.ErrRecordNotFound) { ··· 505 530 506 531 func (s *BGS) syncUserBlobs(ctx context.Context, pds *models.PDS, user bsutil.Uid, blobs []string) error { 507 532 if s.blobs == nil { 508 - log.Infof("blob syncing disabled") 533 + log.Debugf("blob syncing disabled") 509 534 return nil 510 535 } 511 536 ··· 516 541 517 542 for _, b := range blobs { 518 543 c := models.ClientForPds(pds) 544 + s.Index.ApplyPDSClientSettings(c) 519 545 blob, err := atproto.SyncGetBlob(ctx, c, b, did) 520 546 if err != nil { 521 547 return fmt.Errorf("fetching blob (%s, %s): %w", did, b, err) ··· 562 588 } 563 589 564 590 c := &xrpc.Client{Host: durl.String()} 591 + s.Index.ApplyPDSClientSettings(c) 565 592 566 593 if peering.ID == 0 { 567 594 // TODO: the case of handling a new user on a new PDS probably requires more thought ··· 623 650 624 651 if exu.Handle != handle { 625 652 // Users handle has changed, update 626 - if err := s.db.Model(User{}).Where("id = ?", exu.ID).Update("handle", peering.ID).Error; err != nil { 653 + if err := s.db.Model(User{}).Where("id = ?", exu.ID).Update("handle", handle).Error; err != nil { 627 654 return nil, fmt.Errorf("failed to update users handle: %w", err) 628 655 } 629 656
+24 -4
bgs/fedmgr.go
··· 108 108 return err 109 109 } 110 110 111 + if peering.Blocked { 112 + return fmt.Errorf("cannot subscribe to blocked pds") 113 + } 114 + 111 115 if peering.ID == 0 { 112 116 // New PDS! 113 117 npds := models.PDS{ ··· 233 237 234 238 rsc := &events.RepoStreamCallbacks{ 235 239 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 236 - log.Infow("got remote repo event", "host", host.Host, "repo", evt.Repo, "seq", evt.Seq) 240 + log.Debugw("got remote repo event", "host", host.Host, "repo", evt.Repo, "seq", evt.Seq) 237 241 if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{ 238 242 RepoCommit: evt, 239 243 }); err != nil { ··· 298 302 }, 299 303 // TODO: all the other event types (handle change, migration, etc) 300 304 Error: func(errf *events.ErrorFrame) error { 301 - return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message) 305 + switch errf.Error { 306 + case "FutureCursor": 307 + // if we get a FutureCursor frame, reset our sequence number for this host 308 + if err := s.db.Table("pds").Where("id = ?", host.ID).Update("cursor", 0).Error; err != nil { 309 + return err 310 + } 311 + 312 + return fmt.Errorf("got FutureCursor frame, reset cursor tracking for host") 313 + default: 314 + return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message) 315 + } 302 316 }, 303 317 } 304 318 ··· 323 337 324 338 var ErrNoActiveConnection = fmt.Errorf("no active connection to host") 325 339 326 - func (s *Slurper) KillUpstreamConnection(host string) error { 340 + func (s *Slurper) KillUpstreamConnection(host string, block bool) error { 327 341 s.lk.Lock() 328 342 defer s.lk.Unlock() 329 343 ··· 331 345 if !ok { 332 346 return fmt.Errorf("killing connection %q: %w", host, ErrNoActiveConnection) 333 347 } 348 + ac.cancel() 334 349 335 - ac.cancel() 350 + if block { 351 + if err := s.db.Model(models.PDS{}).Where("id = ?").UpdateColumn("blocked", true).Error; err != nil { 352 + return fmt.Errorf("failed to set host as blocked: %w", err) 353 + } 354 + } 355 + 336 356 return nil 337 357 }
+8 -8
bgs/handlers.go
··· 30 30 31 31 return buf, nil 32 32 */ 33 - panic("nyi") 33 + return nil, fmt.Errorf("nyi") 34 34 } 35 35 36 36 func (s *BGS) handleComAtprotoSyncGetCommitPath(ctx context.Context, did string, earliest string, latest string) (*comatprototypes.SyncGetCommitPath_Output, error) { 37 - panic("nyi") 37 + return nil, fmt.Errorf("nyi") 38 38 } 39 39 40 40 func (s *BGS) handleComAtprotoSyncGetHead(ctx context.Context, did string) (*comatprototypes.SyncGetHead_Output, error) { ··· 54 54 } 55 55 56 56 func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, commit string, did string, rkey string) (io.Reader, error) { 57 - panic("nyi") 57 + return nil, fmt.Errorf("nyi") 58 58 } 59 59 60 60 func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, earliest string, latest string) (io.Reader, error) { ··· 92 92 } 93 93 94 94 func (s *BGS) handleComAtprotoSyncGetBlocks(ctx context.Context, cids []string, did string) (io.Reader, error) { 95 - panic("NYI") 95 + return nil, fmt.Errorf("NYI") 96 96 } 97 97 98 98 func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, host string) error { ··· 105 105 } 106 106 107 107 func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, hostname string) error { 108 - panic("NYI") 109 - //return s.slurper.SubscribeToPds(ctx, host, false) 108 + // TODO: 109 + return nil 110 110 } 111 111 112 112 func (s *BGS) handleComAtprotoSyncGetBlob(ctx context.Context, cid string, did string) (io.Reader, error) { ··· 123 123 } 124 124 125 125 func (s *BGS) handleComAtprotoSyncListBlobs(ctx context.Context, did string, earliest string, latest string) (*comatprototypes.SyncListBlobs_Output, error) { 126 - panic("NYI") 126 + return nil, fmt.Errorf("NYI") 127 127 } 128 128 129 129 func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor string, limit int) (*comatprototypes.SyncListRepos_Output, error) { 130 - panic("NYI") 130 + return nil, fmt.Errorf("NYI") 131 131 }
+3
carstore/bs.go
··· 110 110 } 111 111 112 112 func (uv *userView) Get(ctx context.Context, k cid.Cid) (blockformat.Block, error) { 113 + if !k.Defined() { 114 + return nil, fmt.Errorf("attempted to 'get' undefined cid") 115 + } 113 116 if uv.cache != nil { 114 117 blk, ok := uv.cache[k] 115 118 if ok {
+11 -1
cmd/bigsky/main.go
··· 18 18 "github.com/bluesky-social/indigo/plc" 19 19 "github.com/bluesky-social/indigo/repomgr" 20 20 "github.com/bluesky-social/indigo/version" 21 + "github.com/bluesky-social/indigo/xrpc" 21 22 22 23 _ "net/http/pprof" 23 24 ··· 90 91 }, 91 92 &cli.BoolFlag{ 92 93 Name: "aggregation", 93 - Value: true, 94 + Value: false, 94 95 }, 95 96 &cli.StringFlag{ 96 97 Name: "api-listen", ··· 185 186 ix, err := indexer.NewIndexer(db, notifman, evtman, cachedidr, repoman, true, cctx.Bool("aggregation")) 186 187 if err != nil { 187 188 return err 189 + } 190 + 191 + rlskip := os.Getenv("BSKY_SOCIAL_RATE_LIMIT_SKIP") 192 + ix.ApplyPDSClientSettings = func(c *xrpc.Client) { 193 + if c.Host == "https://bsky.social" && rlskip != "" { 194 + c.Headers = map[string]string{ 195 + "x-ratelimit-bypass": rlskip, 196 + } 197 + } 188 198 } 189 199 190 200 repoman.SetEventHandler(func(ctx context.Context, evt *repomgr.RepoEvent) {
+1 -1
events/consumer.go
··· 41 41 case xev.Error != nil && rsc.Error != nil: 42 42 return rsc.Error(xev.Error) 43 43 default: 44 - return fmt.Errorf("no know event in XRPCStreamEvent object") 44 + return nil 45 45 } 46 46 } 47 47
+22 -7
indexer/indexer.go
··· 9 9 10 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 11 bsky "github.com/bluesky-social/indigo/api/bsky" 12 - "github.com/bluesky-social/indigo/carstore" 13 12 "github.com/bluesky-social/indigo/did" 14 13 "github.com/bluesky-social/indigo/events" 15 14 lexutil "github.com/bluesky-social/indigo/lex/util" ··· 29 28 30 29 var log = logging.Logger("indexer") 31 30 31 + const MaxEventSliceLength = 1000000 32 + const MaxOpsSliceLength = 200 33 + 32 34 type Indexer struct { 33 35 db *gorm.DB 34 36 ··· 43 45 44 46 doAggregations bool 45 47 46 - SendRemoteFollow func(context.Context, string, uint) error 47 - CreateExternalUser func(context.Context, string) (*models.ActorInfo, error) 48 + SendRemoteFollow func(context.Context, string, uint) error 49 + CreateExternalUser func(context.Context, string) (*models.ActorInfo, error) 50 + ApplyPDSClientSettings func(*xrpc.Client) 48 51 } 49 52 50 53 func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events.EventManager, didr did.Resolver, repoman *repomgr.RepoManager, crawl, aggregate bool) (*Indexer, error) { ··· 64 67 SendRemoteFollow: func(context.Context, string, uint) error { 65 68 return nil 66 69 }, 70 + ApplyPDSClientSettings: func(*xrpc.Client) {}, 67 71 } 68 72 69 73 if crawl { ··· 106 110 107 111 toobig := false 108 112 slice := evt.RepoSlice 109 - if len(slice) > carstore.MaxSliceLength { 113 + if len(slice) > MaxEventSliceLength || len(outops) > MaxOpsSliceLength { 110 114 slice = nil 115 + outops = nil 111 116 toobig = true 112 - 113 117 } 114 118 115 119 if evt.Rebase { ··· 372 376 } 373 377 return nil 374 378 case *bsky.GraphFollow: 379 + _, err := ix.GetUserOrMissing(ctx, rec.Subject) 380 + if err != nil { 381 + log.Infow("failed to crawl follow subject", "cid", op.RecCid, "subjectdid", rec.Subject, "err", err) 382 + } 383 + return nil 384 + case *bsky.GraphBlock: 375 385 _, err := ix.GetUserOrMissing(ctx, rec.Subject) 376 386 if err != nil { 377 387 log.Infow("failed to crawl follow subject", "cid", op.RecCid, "subjectdid", rec.Subject, "err", err) ··· 828 838 829 839 curHead, err := ix.repomgr.GetRepoRoot(ctx, ai.Uid) 830 840 if err != nil && !isNotFound(err) { 831 - return err 841 + return fmt.Errorf("failed to get repo root: %w", err) 832 842 } 833 843 834 844 var rebase *comatproto.SyncSubscribeRepos_Commit ··· 848 858 } 849 859 850 860 if rebase != nil { 851 - return ix.repomgr.HandleRebase(ctx, ai.PDS, ai.Uid, ai.Did, (*cid.Cid)(rebase.Prev), (cid.Cid)(rebase.Commit), rebase.Blocks) 861 + if err := ix.repomgr.HandleRebase(ctx, ai.PDS, ai.Uid, ai.Did, (*cid.Cid)(rebase.Prev), (cid.Cid)(rebase.Commit), rebase.Blocks); err != nil { 862 + return fmt.Errorf("handling rebase: %w", err) 863 + } 864 + return nil 852 865 } 853 866 854 867 var host string ··· 860 873 c := &xrpc.Client{ 861 874 Host: host, 862 875 } 876 + 877 + ix.ApplyPDSClientSettings(c) 863 878 864 879 var from string 865 880 if curHead.Defined() {
+1
models/models.go
··· 108 108 SSL bool 109 109 Cursor int64 110 110 Registered bool 111 + Blocked bool 111 112 } 112 113 113 114 func ClientForPds(pds *PDS) *xrpc.Client {
+1 -5
repomgr/repomgr.go
··· 533 533 // TODO: do we allow prev to be nil in any case here? 534 534 if prev != nil { 535 535 if *prev != head { 536 - log.Errorw("rebase 'prev' value did not match our latest head for repo", "did", did, "rprev", prev.String(), "lprev", head.String()) 536 + log.Warnw("rebase 'prev' value did not match our latest head for repo", "did", did, "rprev", prev.String(), "lprev", head.String()) 537 537 } 538 538 } 539 539 ··· 992 992 slice, err := finish(ctx) 993 993 if err != nil { 994 994 return err 995 - } 996 - 997 - if len(slice) > carstore.MaxSliceLength { 998 - log.Warnw("output slice too large", "len", len(slice), "user", user, "old", old, "new", nu) 999 995 } 1000 996 1001 997 if err := rm.updateUserRepoHead(ctx, user, nu); err != nil {
+1 -1
util/http.go
··· 52 52 retryClient.RetryWaitMax = 10 * time.Second 53 53 retryClient.Logger = retryablehttp.LeveledLogger(LeveledZap{log}) 54 54 client := retryClient.StandardClient() 55 - client.Timeout = 20 * time.Second 55 + client.Timeout = 30 * time.Second 56 56 return client 57 57 } 58 58
+14 -4
xrpc/xrpc.go
··· 40 40 Did string `json:"did"` 41 41 } 42 42 43 + type XRPCError struct { 44 + ErrStr string `json:"error"` 45 + Message string `json:"message"` 46 + } 47 + 48 + func (xe *XRPCError) Error() string { 49 + return fmt.Sprintf("%s: %s", xe.ErrStr, xe.Message) 50 + } 51 + 43 52 const ( 44 53 Query = XRPCRequestType(iota) 45 54 Procedure ··· 126 135 defer resp.Body.Close() 127 136 128 137 if resp.StatusCode != 200 { 129 - var i interface{} 130 - _ = json.NewDecoder(resp.Body).Decode(&i) 131 - fmt.Println(i) 132 - return fmt.Errorf("XRPC ERROR %d: %s", resp.StatusCode, resp.Status) 138 + var xe XRPCError 139 + if err := json.NewDecoder(resp.Body).Decode(&xe); err != nil { 140 + return fmt.Errorf("failed to decode xrpc error message (status: %d): %w", resp.StatusCode, err) 141 + } 142 + return fmt.Errorf("XRPC ERROR %d: %w", resp.StatusCode, &xe) 133 143 } 134 144 135 145 if out != nil {