this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

remove blobs package (including from BGS) (#675)

We haven't ever actually used the blob sync stuff in prod. This removes
that dead code, and the entire "blobs" package. We can always pull this
out of git history if we decide we want it back.

This change is expected to be safe and a no-op to merge and deploy. I
verified that `disk-blob-store` is a flag-only option, and that flag
does not show up in `grep` results in our internal infra repos.

authored by

bnewbold and committed by
GitHub
638c7b9a ce1279f6

+4 -177
+2 -43
bgs/bgs.go
··· 19 19 "github.com/bluesky-social/indigo/api" 20 20 atproto "github.com/bluesky-social/indigo/api/atproto" 21 21 comatproto "github.com/bluesky-social/indigo/api/atproto" 22 - "github.com/bluesky-social/indigo/blobs" 23 22 "github.com/bluesky-social/indigo/carstore" 24 23 "github.com/bluesky-social/indigo/did" 25 24 "github.com/bluesky-social/indigo/events" ··· 62 61 didr did.Resolver 63 62 repoFetcher *indexer.RepoFetcher 64 63 65 - blobs blobs.BlobStore 66 - hr api.HandleResolver 64 + hr api.HandleResolver 67 65 68 66 // TODO: work on doing away with this flag in favor of more pluggable 69 67 // pieces that abstract the need for explicit ssl checks ··· 107 105 EventsSent promclient.Counter 108 106 } 109 107 110 - func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, blobs blobs.BlobStore, rf *indexer.RepoFetcher, hr api.HandleResolver, ssl bool, compactInterval time.Duration) (*BGS, error) { 108 + func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, rf *indexer.RepoFetcher, hr api.HandleResolver, ssl bool, compactInterval time.Duration) (*BGS, error) { 111 109 db.AutoMigrate(User{}) 112 110 db.AutoMigrate(AuthToken{}) 113 111 db.AutoMigrate(models.PDS{}) ··· 122 120 repoman: repoman, 123 121 events: evtman, 124 122 didr: didr, 125 - blobs: blobs, 126 123 ssl: ssl, 127 124 128 125 consumersLk: sync.RWMutex{}, ··· 894 891 return fmt.Errorf("handle user event failed: %w", err) 895 892 } 896 893 897 - // sync blobs 898 - if len(evt.Blobs) > 0 { 899 - var blobStrs []string 900 - for _, b := range evt.Blobs { 901 - blobStrs = append(blobStrs, b.String()) 902 - } 903 - if err := bgs.syncUserBlobs(ctx, host, u.ID, blobStrs); err != nil { 904 - return err 905 - } 906 - } 907 - 908 894 return nil 909 895 case env.RepoHandle != nil: 910 896 log.Infow("bgs got repo handle event", "did", env.RepoHandle.Did, "handle", env.RepoHandle.Handle) ··· 1011 997 return bgs.events.AddEvent(ctx, &events.XRPCStreamEvent{ 1012 998 RepoTombstone: evt, 1013 999 }) 1014 - } 1015 - 1016 - func (s *BGS) syncUserBlobs(ctx context.Context, pds *models.PDS, user models.Uid, blobs []string) error { 1017 - if s.blobs == nil { 1018 - log.Debugf("blob syncing disabled") 1019 - return nil 1020 - } 1021 - 1022 - did, err := s.Index.DidForUser(ctx, user) 1023 - if err != nil { 1024 - return err 1025 - } 1026 - 1027 - for _, b := range blobs { 1028 - c := models.ClientForPds(pds) 1029 - s.Index.ApplyPDSClientSettings(c) 1030 - blob, err := atproto.SyncGetBlob(ctx, c, b, did) 1031 - if err != nil { 1032 - return fmt.Errorf("fetching blob (%s, %s): %w", did, b, err) 1033 - } 1034 - 1035 - if err := s.blobs.PutBlob(ctx, b, did, blob); err != nil { 1036 - return fmt.Errorf("storing blob (%s, %s): %w", did, b, err) 1037 - } 1038 - } 1039 - 1040 - return nil 1041 1000 } 1042 1001 1043 1002 // TODO: rename? This also updates users, and 'external' is an old phrasing
-22
bgs/handlers.go
··· 13 13 14 14 atproto "github.com/bluesky-social/indigo/api/atproto" 15 15 comatprototypes "github.com/bluesky-social/indigo/api/atproto" 16 - "github.com/bluesky-social/indigo/blobs" 17 16 "github.com/bluesky-social/indigo/carstore" 18 17 "github.com/bluesky-social/indigo/mst" 19 18 "gorm.io/gorm" ··· 168 167 func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, body *comatprototypes.SyncNotifyOfUpdate_Input) error { 169 168 // TODO: 170 169 return nil 171 - } 172 - 173 - func (s *BGS) handleComAtprotoSyncGetBlob(ctx context.Context, cid string, did string) (io.Reader, error) { 174 - if s.blobs == nil { 175 - return nil, echo.NewHTTPError(http.StatusNotFound, "blobs not enabled on this server") 176 - } 177 - 178 - b, err := s.blobs.GetBlob(ctx, cid, did) 179 - if err != nil { 180 - if errors.Is(err, blobs.NotFoundErr) { 181 - return nil, echo.NewHTTPError(http.StatusNotFound, "blob not found") 182 - } 183 - log.Errorw("failed to get blob", "err", err, "cid", cid, "did", did) 184 - return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to get blob") 185 - } 186 - 187 - return bytes.NewReader(b), nil 188 - } 189 - 190 - func (s *BGS) handleComAtprotoSyncListBlobs(ctx context.Context, cursor string, did string, limit int, since string) (*comatprototypes.SyncListBlobs_Output, error) { 191 - return nil, fmt.Errorf("NYI") 192 170 } 193 171 194 172 func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor string, limit int) (*comatprototypes.SyncListRepos_Output, error) {
-61
bgs/stubs.go
··· 22 22 } 23 23 24 24 func (s *BGS) RegisterHandlersComAtproto(e *echo.Echo) error { 25 - e.GET("/xrpc/com.atproto.sync.getBlob", s.HandleComAtprotoSyncGetBlob) 26 25 e.GET("/xrpc/com.atproto.sync.getBlocks", s.HandleComAtprotoSyncGetBlocks) 27 26 e.GET("/xrpc/com.atproto.sync.getLatestCommit", s.HandleComAtprotoSyncGetLatestCommit) 28 27 e.GET("/xrpc/com.atproto.sync.getRecord", s.HandleComAtprotoSyncGetRecord) 29 28 e.GET("/xrpc/com.atproto.sync.getRepo", s.HandleComAtprotoSyncGetRepo) 30 - e.GET("/xrpc/com.atproto.sync.listBlobs", s.HandleComAtprotoSyncListBlobs) 31 29 e.GET("/xrpc/com.atproto.sync.listRepos", s.HandleComAtprotoSyncListRepos) 32 30 e.POST("/xrpc/com.atproto.sync.notifyOfUpdate", s.HandleComAtprotoSyncNotifyOfUpdate) 33 31 e.POST("/xrpc/com.atproto.sync.requestCrawl", s.HandleComAtprotoSyncRequestCrawl) 34 32 return nil 35 - } 36 - 37 - func (s *BGS) HandleComAtprotoSyncGetBlob(c echo.Context) error { 38 - ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetBlob") 39 - defer span.End() 40 - bCid := c.QueryParam("cid") 41 - did := c.QueryParam("did") 42 - 43 - _, err := cid.Parse(bCid) 44 - if err != nil { 45 - return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid cid: %s", bCid)}) 46 - } 47 - 48 - _, err = syntax.ParseDID(did) 49 - if err != nil { 50 - return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid did: %s", did)}) 51 - } 52 - 53 - var out io.Reader 54 - var handleErr error 55 - // func (s *BGS) handleComAtprotoSyncGetBlob(ctx context.Context,cid string,did string) (io.Reader, error) 56 - out, handleErr = s.handleComAtprotoSyncGetBlob(ctx, bCid, did) 57 - if handleErr != nil { 58 - return handleErr 59 - } 60 - return c.Stream(200, "application/octet-stream", out) 61 33 } 62 34 63 35 func (s *BGS) HandleComAtprotoSyncGetBlocks(c echo.Context) error { ··· 159 131 return handleErr 160 132 } 161 133 return c.Stream(200, "application/vnd.ipld.car", out) 162 - } 163 - 164 - func (s *BGS) HandleComAtprotoSyncListBlobs(c echo.Context) error { 165 - ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncListBlobs") 166 - defer span.End() 167 - cursor := c.QueryParam("cursor") 168 - did := c.QueryParam("did") 169 - 170 - var limit int 171 - if p := c.QueryParam("limit"); p != "" { 172 - var err error 173 - limit, err = strconv.Atoi(p) 174 - if err != nil { 175 - return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid limit: %s", p)}) 176 - } 177 - } else { 178 - limit = 500 179 - } 180 - 181 - _, err := syntax.ParseDID(did) 182 - if err != nil { 183 - return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid did: %s", did)}) 184 - } 185 - 186 - since := c.QueryParam("since") 187 - var out *comatprototypes.SyncListBlobs_Output 188 - var handleErr error 189 - // func (s *BGS) handleComAtprotoSyncListBlobs(ctx context.Context,cursor string,did string,limit int,since string) (*comatprototypes.SyncListBlobs_Output, error) 190 - out, handleErr = s.handleComAtprotoSyncListBlobs(ctx, cursor, did, limit, since) 191 - if handleErr != nil { 192 - return handleErr 193 - } 194 - return c.JSON(200, out) 195 134 } 196 135 197 136 func (s *BGS) HandleComAtprotoSyncListRepos(c echo.Context) error {
-40
blobs/blobs.go
··· 1 - package blobs 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - "os" 7 - "path/filepath" 8 - ) 9 - 10 - var NotFoundErr = fmt.Errorf("blob not found") 11 - 12 - type BlobStore interface { 13 - PutBlob(ctx context.Context, cid string, did string, blob []byte) error 14 - GetBlob(ctx context.Context, cid string, did string) ([]byte, error) 15 - } 16 - 17 - type DiskBlobStore struct { 18 - Dir string 19 - } 20 - 21 - func (dbs *DiskBlobStore) PutBlob(ctx context.Context, cid string, did string, blob []byte) error { 22 - udir := filepath.Join(dbs.Dir, did) 23 - if err := os.MkdirAll(udir, 0775); err != nil { 24 - return err 25 - } 26 - 27 - return os.WriteFile(filepath.Join(udir, cid), blob, 0664) 28 - } 29 - 30 - func (dbs *DiskBlobStore) GetBlob(ctx context.Context, cid string, did string) ([]byte, error) { 31 - // Check if the blob exists 32 - _, err := os.Stat(filepath.Join(dbs.Dir, did, cid)) 33 - if err != nil { 34 - if os.IsNotExist(err) { 35 - return nil, NotFoundErr 36 - } 37 - return nil, err 38 - } 39 - return os.ReadFile(filepath.Join(dbs.Dir, did, cid)) 40 - }
+1 -10
cmd/bigsky/main.go
··· 12 12 13 13 "github.com/bluesky-social/indigo/api" 14 14 libbgs "github.com/bluesky-social/indigo/bgs" 15 - "github.com/bluesky-social/indigo/blobs" 16 15 "github.com/bluesky-social/indigo/carstore" 17 16 "github.com/bluesky-social/indigo/did" 18 17 "github.com/bluesky-social/indigo/events" ··· 113 112 Name: "metrics-listen", 114 113 Value: ":2471", 115 114 EnvVars: []string{"BGS_METRICS_LISTEN"}, 116 - }, 117 - &cli.StringFlag{ 118 - Name: "disk-blob-store", 119 115 }, 120 116 &cli.StringFlag{ 121 117 Name: "disk-persister-dir", ··· 332 328 } 333 329 }, false) 334 330 335 - var blobstore blobs.BlobStore 336 - if bsdir := cctx.String("disk-blob-store"); bsdir != "" { 337 - blobstore = &blobs.DiskBlobStore{Dir: bsdir} 338 - } 339 - 340 331 prodHR, err := api.NewProdHandleResolver(100_000, cctx.String("resolve-address"), cctx.Bool("force-dns-udp")) 341 332 if err != nil { 342 333 return fmt.Errorf("failed to set up handle resolver: %w", err) ··· 358 349 } 359 350 360 351 log.Infow("constructing bgs") 361 - bgs, err := libbgs.NewBGS(db, ix, repoman, evtman, cachedidr, blobstore, rf, hr, !cctx.Bool("crawl-insecure-ws"), cctx.Duration("compact-interval")) 352 + bgs, err := libbgs.NewBGS(db, ix, repoman, evtman, cachedidr, rf, hr, !cctx.Bool("crawl-insecure-ws"), cctx.Duration("compact-interval")) 362 353 if err != nil { 363 354 return err 364 355 }
+1 -1
testing/utils.go
··· 514 514 515 515 tr := &api.TestHandleResolver{} 516 516 517 - b, err := bgs.NewBGS(maindb, ix, repoman, evtman, didr, nil, rf, tr, false, time.Hour*4) 517 + b, err := bgs.NewBGS(maindb, ix, repoman, evtman, didr, rf, tr, false, time.Hour*4) 518 518 if err != nil { 519 519 return nil, err 520 520 }