this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Feat/blob syncing (#73)

authored by

Whyrusleeping and committed by
GitHub
ce417995 e175912a

+440 -63
+27
api/atproto/syncgetBlob.go
··· 1 + package atproto 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + 7 + "github.com/bluesky-social/indigo/xrpc" 8 + ) 9 + 10 + // schema: com.atproto.sync.getBlob 11 + 12 + func init() { 13 + } 14 + 15 + func SyncGetBlob(ctx context.Context, c *xrpc.Client, cid string, did string) ([]byte, error) { 16 + buf := new(bytes.Buffer) 17 + 18 + params := map[string]interface{}{ 19 + "cid": cid, 20 + "did": did, 21 + } 22 + if err := c.Do(ctx, xrpc.Query, "", "com.atproto.sync.getBlob", params, nil, buf); err != nil { 23 + return nil, err 24 + } 25 + 26 + return buf.Bytes(), nil 27 + }
+32
api/atproto/synclistBlobs.go
··· 1 + package atproto 2 + 3 + import ( 4 + "context" 5 + 6 + "github.com/bluesky-social/indigo/xrpc" 7 + ) 8 + 9 + // schema: com.atproto.sync.listBlobs 10 + 11 + func init() { 12 + } 13 + 14 + type SyncListBlobs_Output struct { 15 + LexiconTypeID string `json:"$type,omitempty"` 16 + Cids []string `json:"cids" cborgen:"cids"` 17 + } 18 + 19 + func SyncListBlobs(ctx context.Context, c *xrpc.Client, did string, earliest string, latest string) (*SyncListBlobs_Output, error) { 20 + var out SyncListBlobs_Output 21 + 22 + params := map[string]interface{}{ 23 + "did": did, 24 + "earliest": earliest, 25 + "latest": latest, 26 + } 27 + if err := c.Do(ctx, xrpc.Query, "", "com.atproto.sync.listBlobs", params, nil, &out); err != nil { 28 + return nil, err 29 + } 30 + 31 + return &out, nil 32 + }
+6 -2
api/atproto/syncnotifyOfUpdate.go
··· 10 10 11 11 func init() { 12 12 } 13 - func SyncNotifyOfUpdate(ctx context.Context, c *xrpc.Client) error { 14 - if err := c.Do(ctx, xrpc.Query, "", "com.atproto.sync.notifyOfUpdate", nil, nil, nil); err != nil { 13 + func SyncNotifyOfUpdate(ctx context.Context, c *xrpc.Client, hostname string) error { 14 + 15 + params := map[string]interface{}{ 16 + "hostname": hostname, 17 + } 18 + if err := c.Do(ctx, xrpc.Query, "", "com.atproto.sync.notifyOfUpdate", params, nil, nil); err != nil { 15 19 return err 16 20 } 17 21
+2 -2
api/atproto/syncrequestCrawl.go
··· 10 10 11 11 func init() { 12 12 } 13 - func SyncRequestCrawl(ctx context.Context, c *xrpc.Client, host string) error { 13 + func SyncRequestCrawl(ctx context.Context, c *xrpc.Client, hostname string) error { 14 14 15 15 params := map[string]interface{}{ 16 - "host": host, 16 + "hostname": hostname, 17 17 } 18 18 if err := c.Do(ctx, xrpc.Query, "", "com.atproto.sync.requestCrawl", params, nil, nil); err != nil { 19 19 return err
+1 -1
api/bsky/feedpost.go
··· 87 87 } 88 88 return fmt.Errorf("cannot cbor marshal empty enum") 89 89 } 90 - 91 90 func (t *FeedPost_Embed) UnmarshalCBOR(r io.Reader) error { 92 91 typ, b, err := util.CborTypeExtractReader(r) 93 92 if err != nil { ··· 104 103 case "app.bsky.embed.record": 105 104 t.EmbedRecord = new(EmbedRecord) 106 105 return t.EmbedRecord.UnmarshalCBOR(bytes.NewReader(b)) 106 + 107 107 default: 108 108 return nil 109 109 }
+96 -3
bgs/bgs.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "encoding/json" 5 6 "errors" 6 7 "fmt" 7 8 "net/http" ··· 14 15 15 16 "contrib.go.opencensus.io/exporter/prometheus" 16 17 atproto "github.com/bluesky-social/indigo/api/atproto" 18 + "github.com/bluesky-social/indigo/blobs" 17 19 "github.com/bluesky-social/indigo/carstore" 18 20 "github.com/bluesky-social/indigo/events" 19 21 "github.com/bluesky-social/indigo/indexer" ··· 43 45 events *events.EventManager 44 46 didr plc.DidResolver 45 47 48 + blobs blobs.BlobStore 49 + 46 50 crawlOnly bool 47 51 48 52 // TODO: at some point we will want to lock specific DIDs, this lock as is ··· 52 56 repoman *repomgr.RepoManager 53 57 } 54 58 55 - func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr plc.DidResolver, ssl bool) (*BGS, error) { 59 + func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr plc.DidResolver, blobs blobs.BlobStore, ssl bool) (*BGS, error) { 56 60 db.AutoMigrate(User{}) 57 61 db.AutoMigrate(models.PDS{}) 58 62 ··· 63 67 repoman: repoman, 64 68 events: evtman, 65 69 didr: didr, 70 + blobs: blobs, 66 71 } 67 72 68 73 ix.CreateExternalUser = bgs.createExternalUser ··· 76 81 } 77 82 78 83 func (bgs *BGS) StartDebug(listen string) error { 84 + http.HandleFunc("/repodbg/user", func(w http.ResponseWriter, r *http.Request) { 85 + ctx := r.Context() 86 + did := r.FormValue("did") 87 + 88 + u, err := bgs.Index.LookupUserByDid(ctx, did) 89 + if err != nil { 90 + http.Error(w, err.Error(), 400) 91 + return 92 + } 93 + 94 + root, err := bgs.repoman.GetRepoRoot(ctx, u.Uid) 95 + if err != nil { 96 + http.Error(w, err.Error(), 400) 97 + return 98 + } 99 + 100 + out := map[string]any{ 101 + "root": root.String(), 102 + "actorInfo": u, 103 + } 104 + 105 + if r.FormValue("carstore") != "" { 106 + stat, err := bgs.repoman.CarStore().Stat(ctx, u.Uid) 107 + if err != nil { 108 + http.Error(w, err.Error(), 400) 109 + return 110 + } 111 + out["carstore"] = stat 112 + } 113 + 114 + json.NewEncoder(w).Encode(out) 115 + }) 116 + http.HandleFunc("/repodbg/blocks", func(w http.ResponseWriter, r *http.Request) { 117 + ctx := r.Context() 118 + did := r.FormValue("did") 119 + c := r.FormValue("cid") 120 + 121 + bcid, err := cid.Decode(c) 122 + if err != nil { 123 + http.Error(w, err.Error(), 400) 124 + return 125 + } 126 + 127 + cs := bgs.repoman.CarStore() 128 + 129 + u, err := bgs.Index.LookupUserByDid(ctx, did) 130 + if err != nil { 131 + http.Error(w, err.Error(), 400) 132 + return 133 + } 134 + 135 + bs, err := cs.ReadOnlySession(u.Uid) 136 + if err != nil { 137 + http.Error(w, err.Error(), 400) 138 + return 139 + } 140 + 141 + blk, err := bs.Get(ctx, bcid) 142 + if err != nil { 143 + http.Error(w, err.Error(), 400) 144 + return 145 + } 146 + 147 + w.WriteHeader(200) 148 + w.Write(blk.RawData()) 149 + 150 + }) 79 151 http.Handle("/prometheus", prometheusHandler()) 80 152 81 153 return http.ListenAndServe(listen, nil) ··· 274 346 // TODO: if the user is already in the 'slow' path, we shouldnt even bother trying to fast path this event 275 347 276 348 if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, prevcid, evt.Blocks); err != nil { 349 + log.Warnw("failed handling event", "err", err, "host", host.Host, "seq", evt.Seq) 277 350 if !errors.Is(err, carstore.ErrRepoBaseMismatch) { 278 - log.Warnw("failed handling event", "err", err, "host", host.Host, "seq", evt.Seq) 279 351 return fmt.Errorf("handle user event failed: %w", err) 280 352 } 281 353 ··· 303 375 } 304 376 305 377 func (s *BGS) syncUserBlobs(ctx context.Context, pds *models.PDS, user bsutil.Uid, blobs []string) error { 306 - log.Warnf("not handling blob syncing yet") 378 + if s.blobs == nil { 379 + log.Infof("blob syncing disabled") 380 + return nil 381 + } 382 + 383 + did, err := s.Index.DidForUser(ctx, user) 384 + if err != nil { 385 + return err 386 + } 387 + 388 + for _, b := range blobs { 389 + c := models.ClientForPds(pds) 390 + blob, err := atproto.SyncGetBlob(ctx, c, b, did) 391 + if err != nil { 392 + return fmt.Errorf("fetching blob (%s, %s): %w", did, b, err) 393 + } 394 + 395 + if err := s.blobs.PutBlob(ctx, b, did, blob); err != nil { 396 + return fmt.Errorf("storing blob (%s, %s): %w", did, b, err) 397 + } 398 + } 399 + 307 400 return nil 308 401 } 309 402
+23 -1
bgs/handlers.go
··· 3 3 import ( 4 4 "bytes" 5 5 "context" 6 + "fmt" 6 7 "io" 7 8 8 9 comatprototypes "github.com/bluesky-social/indigo/api/atproto" ··· 95 96 } 96 97 97 98 func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, host string) error { 99 + if host == "" { 100 + return fmt.Errorf("must pass valid hostname") 101 + } 102 + 98 103 log.Warnf("TODO: host validation for crawl requests") 99 104 return s.slurper.SubscribeToPds(ctx, host, true) 100 105 } 101 106 102 - func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context) error { 107 + func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, hostname string) error { 103 108 panic("NYI") 104 109 //return s.slurper.SubscribeToPds(ctx, host, false) 105 110 } 111 + 112 + func (s *BGS) handleComAtprotoSyncGetBlob(ctx context.Context, cid string, did string) (io.Reader, error) { 113 + if s.blobs == nil { 114 + return nil, fmt.Errorf("blob store disabled") 115 + } 116 + 117 + b, err := s.blobs.GetBlob(ctx, cid, did) 118 + if err != nil { 119 + return nil, err 120 + } 121 + 122 + return bytes.NewReader(b), nil 123 + } 124 + 125 + func (s *BGS) handleComAtprotoSyncListBlobs(ctx context.Context, did string, earliest string, latest string) (*comatprototypes.SyncListBlobs_Output, error) { 126 + panic("NYI") 127 + }
+39 -5
bgs/stubs.go
··· 13 13 } 14 14 15 15 func (s *BGS) RegisterHandlersComAtproto(e *echo.Echo) error { 16 + e.GET("/xrpc/com.atproto.sync.getBlob", s.HandleComAtprotoSyncGetBlob) 16 17 e.GET("/xrpc/com.atproto.sync.getBlocks", s.HandleComAtprotoSyncGetBlocks) 17 18 e.GET("/xrpc/com.atproto.sync.getCheckout", s.HandleComAtprotoSyncGetCheckout) 18 19 e.GET("/xrpc/com.atproto.sync.getCommitPath", s.HandleComAtprotoSyncGetCommitPath) 19 20 e.GET("/xrpc/com.atproto.sync.getHead", s.HandleComAtprotoSyncGetHead) 20 21 e.GET("/xrpc/com.atproto.sync.getRecord", s.HandleComAtprotoSyncGetRecord) 21 22 e.GET("/xrpc/com.atproto.sync.getRepo", s.HandleComAtprotoSyncGetRepo) 23 + e.GET("/xrpc/com.atproto.sync.listBlobs", s.HandleComAtprotoSyncListBlobs) 22 24 e.GET("/xrpc/com.atproto.sync.notifyOfUpdate", s.HandleComAtprotoSyncNotifyOfUpdate) 23 25 e.GET("/xrpc/com.atproto.sync.requestCrawl", s.HandleComAtprotoSyncRequestCrawl) 24 26 return nil 27 + } 28 + 29 + func (s *BGS) HandleComAtprotoSyncGetBlob(c echo.Context) error { 30 + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetBlob") 31 + defer span.End() 32 + cid := c.QueryParam("cid") 33 + did := c.QueryParam("did") 34 + var out io.Reader 35 + var handleErr error 36 + // func (s *BGS) handleComAtprotoSyncGetBlob(ctx context.Context,cid string,did string) (io.Reader, error) 37 + out, handleErr = s.handleComAtprotoSyncGetBlob(ctx, cid, did) 38 + if handleErr != nil { 39 + return handleErr 40 + } 41 + return c.Stream(200, "application/octet-stream", out) 25 42 } 26 43 27 44 func (s *BGS) HandleComAtprotoSyncGetBlocks(c echo.Context) error { ··· 118 135 return c.Stream(200, "application/vnd.ipld.car", out) 119 136 } 120 137 138 + func (s *BGS) HandleComAtprotoSyncListBlobs(c echo.Context) error { 139 + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncListBlobs") 140 + defer span.End() 141 + did := c.QueryParam("did") 142 + earliest := c.QueryParam("earliest") 143 + latest := c.QueryParam("latest") 144 + var out *comatprototypes.SyncListBlobs_Output 145 + var handleErr error 146 + // func (s *BGS) handleComAtprotoSyncListBlobs(ctx context.Context,did string,earliest string,latest string) (*comatprototypes.SyncListBlobs_Output, error) 147 + out, handleErr = s.handleComAtprotoSyncListBlobs(ctx, did, earliest, latest) 148 + if handleErr != nil { 149 + return handleErr 150 + } 151 + return c.JSON(200, out) 152 + } 153 + 121 154 func (s *BGS) HandleComAtprotoSyncNotifyOfUpdate(c echo.Context) error { 122 155 ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncNotifyOfUpdate") 123 156 defer span.End() 157 + hostname := c.QueryParam("hostname") 124 158 var handleErr error 125 - // func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context) error 126 - handleErr = s.handleComAtprotoSyncNotifyOfUpdate(ctx) 159 + // func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context,hostname string) error 160 + handleErr = s.handleComAtprotoSyncNotifyOfUpdate(ctx, hostname) 127 161 if handleErr != nil { 128 162 return handleErr 129 163 } ··· 133 167 func (s *BGS) HandleComAtprotoSyncRequestCrawl(c echo.Context) error { 134 168 ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncRequestCrawl") 135 169 defer span.End() 136 - host := c.QueryParam("host") 170 + hostname := c.QueryParam("hostname") 137 171 var handleErr error 138 - // func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context,host string) error 139 - handleErr = s.handleComAtprotoSyncRequestCrawl(ctx, host) 172 + // func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context,hostname string) error 173 + handleErr = s.handleComAtprotoSyncRequestCrawl(ctx, hostname) 140 174 if handleErr != nil { 141 175 return handleErr 142 176 }
+29
blobs/blobs.go
··· 1 + package blobs 2 + 3 + import ( 4 + "context" 5 + "os" 6 + "path/filepath" 7 + ) 8 + 9 + type BlobStore interface { 10 + PutBlob(ctx context.Context, cid string, did string, blob []byte) error 11 + GetBlob(ctx context.Context, cid string, did string) ([]byte, error) 12 + } 13 + 14 + type DiskBlobStore struct { 15 + Dir string 16 + } 17 + 18 + func (dbs *DiskBlobStore) PutBlob(ctx context.Context, cid string, did string, blob []byte) error { 19 + udir := filepath.Join(dbs.Dir, did) 20 + if err := os.MkdirAll(udir, 0775); err != nil { 21 + return err 22 + } 23 + 24 + return os.WriteFile(filepath.Join(udir, cid), blob, 0664) 25 + } 26 + 27 + func (dbs *DiskBlobStore) GetBlob(ctx context.Context, cid string, did string) ([]byte, error) { 28 + return os.ReadFile(filepath.Join(dbs.Dir, did, cid)) 29 + }
+26 -1
carstore/bs.go
··· 10 10 "os" 11 11 "path/filepath" 12 12 "sync" 13 + "time" 13 14 14 15 util "github.com/bluesky-social/indigo/util" 15 16 ··· 289 290 290 291 if prev != nil { 291 292 if lastShard.Root.CID != *prev { 292 - return nil, fmt.Errorf("mismatch: %s != %s: %w", lastShard.Root, prev.String(), ErrRepoBaseMismatch) 293 + return nil, fmt.Errorf("mismatch: %s != %s: %w", lastShard.Root.CID, prev.String(), ErrRepoBaseMismatch) 293 294 } 294 295 } 295 296 ··· 652 653 653 654 return lastShard.Root.CID, nil 654 655 } 656 + 657 + type UserStat struct { 658 + Seq int 659 + Root string 660 + Created time.Time 661 + } 662 + 663 + func (cs *CarStore) Stat(ctx context.Context, usr util.Uid) ([]UserStat, error) { 664 + var shards []CarShard 665 + if err := cs.meta.Order("seq asc").Find(&shards, "usr = ?", usr).Error; err != nil { 666 + return nil, err 667 + } 668 + 669 + var out []UserStat 670 + for _, s := range shards { 671 + out = append(out, UserStat{ 672 + Seq: s.Seq, 673 + Root: s.Root.CID.String(), 674 + Created: s.CreatedAt, 675 + }) 676 + } 677 + 678 + return out, nil 679 + }
+10 -1
cmd/bigsky/main.go
··· 9 9 10 10 "github.com/bluesky-social/indigo/api" 11 11 "github.com/bluesky-social/indigo/bgs" 12 + "github.com/bluesky-social/indigo/blobs" 12 13 "github.com/bluesky-social/indigo/carstore" 13 14 cliutil "github.com/bluesky-social/indigo/cmd/gosky/util" 14 15 "github.com/bluesky-social/indigo/events" ··· 106 107 Name: "debug-listen", 107 108 Value: "localhost:2471", 108 109 }, 110 + &cli.StringFlag{ 111 + Name: "disk-blob-store", 112 + }, 109 113 } 110 114 111 115 app.Action = func(cctx *cli.Context) error { ··· 194 198 } 195 199 }) 196 200 197 - bgs, err := bgs.NewBGS(db, ix, repoman, evtman, cachedidr, !cctx.Bool("crawl-insecure-ws")) 201 + var blobstore blobs.BlobStore 202 + if bsdir := cctx.String("disk-blob-store"); bsdir != "" { 203 + blobstore = &blobs.DiskBlobStore{bsdir} 204 + } 205 + 206 + bgs, err := bgs.NewBGS(db, ix, repoman, evtman, cachedidr, blobstore, !cctx.Bool("crawl-insecure-ws")) 198 207 if err != nil { 199 208 return err 200 209 }
+13 -17
cmd/gosky/debug.go
··· 11 11 12 12 "github.com/bluesky-social/indigo/events" 13 13 "github.com/bluesky-social/indigo/repo" 14 + "github.com/bluesky-social/indigo/repomgr" 14 15 "github.com/gorilla/websocket" 15 - "github.com/ipfs/go-cid" 16 16 "github.com/ipld/go-car/v2" 17 17 cli "github.com/urfave/cli/v2" 18 18 ) ··· 114 114 return fmt.Errorf("opening repo from slice: %w", err) 115 115 } 116 116 117 - var prev cid.Cid 118 - if match.Prev != nil { 119 - c, err := cid.Decode(*match.Prev) 120 - if err != nil { 121 - return err 117 + fmt.Println("\nOps: ") 118 + for _, op := range match.Ops { 119 + switch repomgr.EventKind(op.Action) { 120 + case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 121 + rcid, _, err := r.GetRecord(ctx, op.Path) 122 + if err != nil { 123 + return fmt.Errorf("loading %q: %w", op.Path, err) 124 + } 125 + if rcid.String() != *op.Cid { 126 + return fmt.Errorf("mismatch in record cid %s != %s", rcid, *op.Cid) 127 + } 128 + fmt.Printf("%s (%s): %s\n", op.Action, op.Path, *op.Cid) 122 129 } 123 - prev = c 124 - } 125 - 126 - fmt.Println("\nDiff ops: ") 127 - diff, err := r.DiffSince(ctx, prev) 128 - if err != nil { 129 - return err 130 - } 131 - 132 - for _, d := range diff { 133 - fmt.Printf("%s (%s): %s -> %s\n", d.Op, d.Rpath, d.OldCid, d.NewCid) 134 130 } 135 131 136 132 return nil
+11 -2
cmd/gosky/main.go
··· 59 59 &cli.StringFlag{ 60 60 Name: "pds-host", 61 61 Usage: "method, hostname, and port of PDS instance", 62 - Value: "http://localhost:4849", 62 + Value: "https://bsky.social", 63 63 EnvVars: []string{"ATP_PDS_HOST"}, 64 64 }, 65 65 &cli.StringFlag{ ··· 859 859 ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT) 860 860 defer stop() 861 861 862 + arg := cctx.Args().First() 863 + if !strings.Contains(arg, "subscribeAllRepos") { 864 + arg = arg + "/xrpc/com.atproto.sync.subscribeAllRepos" 865 + } 866 + if len(cctx.Args().Slice()) == 2 { 867 + arg = fmt.Sprintf("%s?cursor=%s", arg, cctx.Args().Get(1)) 868 + } 869 + 870 + fmt.Println("dialing: ", arg) 862 871 d := websocket.DefaultDialer 863 - con, _, err := d.Dial(cctx.Args().First(), http.Header{}) 872 + con, _, err := d.Dial(arg, http.Header{}) 864 873 if err != nil { 865 874 return fmt.Errorf("dial failure: %w", err) 866 875 }
+6
events/dbpersist.go
··· 63 63 64 64 evt := e.RepoAppend 65 65 66 + // TODO: hack hack hack 67 + if len(evt.Ops) > 8192 { 68 + log.Errorf("(VERY BAD) truncating ops field in outgoing event (len = %d)", len(evt.Ops)) 69 + evt.Ops = evt.Ops[:8192] 70 + } 71 + 66 72 uid, err := p.uidForDid(ctx, evt.Repo) 67 73 if err != nil { 68 74 return err
+17
events/events.go
··· 65 65 if s.filter(op.evt) { 66 66 select { 67 67 case s.outgoing <- op.evt: 68 + case <-s.done: 69 + go func(torem *Subscriber) { 70 + select { 71 + case em.ops <- &Operation{ 72 + op: opUnsubscribe, 73 + sub: torem, 74 + }: 75 + case <-em.closed: 76 + } 77 + }(s) 68 78 default: 69 79 log.Error("event overflow") 70 80 } ··· 207 217 log.Errorf("events playback: %s", err) 208 218 } 209 219 } 220 + } 221 + 222 + // if cancel happens before playback completes 223 + select { 224 + case <-done: 225 + return 226 + default: 210 227 } 211 228 212 229 select {
+8 -4
events/repostream.go
··· 11 11 cid "github.com/ipfs/go-cid" 12 12 ) 13 13 14 - type LiteStreamHandleFunc func(op repomgr.EventKind, path string, did string, rcid *cid.Cid, rec any) error 14 + type LiteStreamHandleFunc func(op repomgr.EventKind, seq int64, path string, did string, rcid *cid.Cid, rec any) error 15 15 16 16 func ConsumeRepoStreamLite(ctx context.Context, con *websocket.Conn, cb LiteStreamHandleFunc) error { 17 17 return HandleRepoStream(ctx, con, &RepoStreamCallbacks{ 18 18 RepoAppend: func(evt *RepoAppend) error { 19 + if evt.TooBig { 20 + log.Errorf("skipping too big events for now: %d", evt.Seq) 21 + return nil 22 + } 19 23 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 20 24 if err != nil { 21 - return err 25 + return fmt.Errorf("reading repo from car (seq: %d, len: %d): %w", evt.Seq, len(evt.Blocks), err) 22 26 } 23 27 24 28 for _, op := range evt.Ops { ··· 36 40 return fmt.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid) 37 41 } 38 42 39 - if err := cb(ek, op.Path, evt.Repo, &rc, rec); err != nil { 43 + if err := cb(ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil { 40 44 return err 41 45 } 42 46 43 47 case repomgr.EvtKindDeleteRecord: 44 - if err := cb(ek, op.Path, evt.Repo, nil, nil); err != nil { 48 + if err := cb(ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil { 45 49 return err 46 50 } 47 51 }
+7 -7
lex/gen.go
··· 129 129 130 130 if ts.Output != nil { 131 131 if ts.Output.Schema == nil { 132 - if ts.Output.Encoding != "application/cbor" && ts.Output.Encoding != "application/vnd.ipld.car" { 132 + if ts.Output.Encoding != "application/cbor" && ts.Output.Encoding != "application/vnd.ipld.car" && ts.Output.Encoding != "*/*" { 133 133 panic(fmt.Sprintf("strange output type def in %s", s.ID)) 134 134 } 135 135 } else { ··· 431 431 out := "error" 432 432 if s.Output != nil { 433 433 switch s.Output.Encoding { 434 - case EncodingCBOR, EncodingCAR: 434 + case EncodingCBOR, EncodingCAR, EncodingANY: 435 435 out = "([]byte, error)" 436 436 case EncodingJSON: 437 437 outname := fname + "_Output" ··· 452 452 outRet := "nil" 453 453 if s.Output != nil { 454 454 switch s.Output.Encoding { 455 - case EncodingCBOR, EncodingCAR: 455 + case EncodingCBOR, EncodingCAR, EncodingANY: 456 456 fmt.Fprintf(w, "buf := new(bytes.Buffer)\n") 457 457 outvar = "buf" 458 458 errRet = "nil, err" ··· 912 912 returndef := "error" 913 913 if s.Output != nil { 914 914 switch s.Output.Encoding { 915 - case "application/json": 915 + case EncodingJSON: 916 916 assign = "out, handleErr" 917 917 outname := tname + "_Output" 918 918 if s.Output.Schema.Type == "ref" { ··· 920 920 } 921 921 fmt.Fprintf(w, "var out *%s.%s\n", impname, outname) 922 922 returndef = fmt.Sprintf("(*%s.%s, error)", impname, outname) 923 - case "application/cbor", "application/vnd.ipld.car": 923 + case EncodingCBOR, EncodingCAR, EncodingANY: 924 924 assign = "out, handleErr" 925 925 fmt.Fprintf(w, "var out io.Reader\n") 926 926 returndef = "(io.Reader, error)" 927 927 default: 928 - return fmt.Errorf("unrecognized output encoding: %q", s.Output.Encoding) 928 + return fmt.Errorf("unrecognized output encoding (1): %q", s.Output.Encoding) 929 929 } 930 930 } 931 931 fmt.Fprintf(w, "var handleErr error\n") ··· 944 944 case EncodingCAR: 945 945 fmt.Fprintf(w, "return c.Stream(200, \"application/vnd.ipld.car\", out)\n}\n\n") 946 946 default: 947 - return fmt.Errorf("unrecognized output encoding: %s", s.Output.Encoding) 947 + return fmt.Errorf("unrecognized output encoding (2): %q", s.Output.Encoding) 948 948 } 949 949 } else { 950 950 fmt.Fprintf(w, "return nil\n}\n\n")
+13
models/models.go
··· 7 7 8 8 bsky "github.com/bluesky-social/indigo/api/bsky" 9 9 "github.com/bluesky-social/indigo/util" 10 + "github.com/bluesky-social/indigo/xrpc" 10 11 ) 11 12 12 13 type FeedPost struct { ··· 104 105 Cursor int64 105 106 Registered bool 106 107 } 108 + 109 + func ClientForPds(pds *PDS) *xrpc.Client { 110 + if pds.SSL { 111 + return &xrpc.Client{ 112 + Host: "https://" + pds.Host, 113 + } 114 + } 115 + 116 + return &xrpc.Client{ 117 + Host: "http://" + pds.Host, 118 + } 119 + }
+9 -1
pds/handlers.go
··· 754 754 panic("nyi") 755 755 } 756 756 757 - func (s *Server) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context) error { 757 + func (s *Server) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, hostname string) error { 758 758 panic("nyi") 759 759 } 760 760 761 761 func (s *Server) handleComAtprotoSyncRequestCrawl(ctx context.Context, host string) error { 762 762 panic("nyi") 763 763 } 764 + 765 + func (s *Server) handleComAtprotoSyncGetBlob(ctx context.Context, cid string, did string) (io.Reader, error) { 766 + panic("nyi") 767 + } 768 + 769 + func (s *Server) handleComAtprotoSyncListBlobs(ctx context.Context, did string, earliest string, latest string) (*comatprototypes.SyncListBlobs_Output, error) { 770 + panic("nyi") 771 + }
+39 -5
pds/stubs.go
··· 505 505 e.POST("/xrpc/com.atproto.session.delete", s.HandleComAtprotoSessionDelete) 506 506 e.GET("/xrpc/com.atproto.session.get", s.HandleComAtprotoSessionGet) 507 507 e.POST("/xrpc/com.atproto.session.refresh", s.HandleComAtprotoSessionRefresh) 508 + e.GET("/xrpc/com.atproto.sync.getBlob", s.HandleComAtprotoSyncGetBlob) 508 509 e.GET("/xrpc/com.atproto.sync.getBlocks", s.HandleComAtprotoSyncGetBlocks) 509 510 e.GET("/xrpc/com.atproto.sync.getCheckout", s.HandleComAtprotoSyncGetCheckout) 510 511 e.GET("/xrpc/com.atproto.sync.getCommitPath", s.HandleComAtprotoSyncGetCommitPath) 511 512 e.GET("/xrpc/com.atproto.sync.getHead", s.HandleComAtprotoSyncGetHead) 512 513 e.GET("/xrpc/com.atproto.sync.getRecord", s.HandleComAtprotoSyncGetRecord) 513 514 e.GET("/xrpc/com.atproto.sync.getRepo", s.HandleComAtprotoSyncGetRepo) 515 + e.GET("/xrpc/com.atproto.sync.listBlobs", s.HandleComAtprotoSyncListBlobs) 514 516 e.GET("/xrpc/com.atproto.sync.notifyOfUpdate", s.HandleComAtprotoSyncNotifyOfUpdate) 515 517 e.GET("/xrpc/com.atproto.sync.requestCrawl", s.HandleComAtprotoSyncRequestCrawl) 516 518 return nil ··· 1104 1106 return c.JSON(200, out) 1105 1107 } 1106 1108 1109 + func (s *Server) HandleComAtprotoSyncGetBlob(c echo.Context) error { 1110 + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetBlob") 1111 + defer span.End() 1112 + cid := c.QueryParam("cid") 1113 + did := c.QueryParam("did") 1114 + var out io.Reader 1115 + var handleErr error 1116 + // func (s *Server) handleComAtprotoSyncGetBlob(ctx context.Context,cid string,did string) (io.Reader, error) 1117 + out, handleErr = s.handleComAtprotoSyncGetBlob(ctx, cid, did) 1118 + if handleErr != nil { 1119 + return handleErr 1120 + } 1121 + return c.Stream(200, "application/octet-stream", out) 1122 + } 1123 + 1107 1124 func (s *Server) HandleComAtprotoSyncGetBlocks(c echo.Context) error { 1108 1125 ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetBlocks") 1109 1126 defer span.End() ··· 1198 1215 return c.Stream(200, "application/vnd.ipld.car", out) 1199 1216 } 1200 1217 1218 + func (s *Server) HandleComAtprotoSyncListBlobs(c echo.Context) error { 1219 + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncListBlobs") 1220 + defer span.End() 1221 + did := c.QueryParam("did") 1222 + earliest := c.QueryParam("earliest") 1223 + latest := c.QueryParam("latest") 1224 + var out *comatprototypes.SyncListBlobs_Output 1225 + var handleErr error 1226 + // func (s *Server) handleComAtprotoSyncListBlobs(ctx context.Context,did string,earliest string,latest string) (*comatprototypes.SyncListBlobs_Output, error) 1227 + out, handleErr = s.handleComAtprotoSyncListBlobs(ctx, did, earliest, latest) 1228 + if handleErr != nil { 1229 + return handleErr 1230 + } 1231 + return c.JSON(200, out) 1232 + } 1233 + 1201 1234 func (s *Server) HandleComAtprotoSyncNotifyOfUpdate(c echo.Context) error { 1202 1235 ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncNotifyOfUpdate") 1203 1236 defer span.End() 1237 + hostname := c.QueryParam("hostname") 1204 1238 var handleErr error 1205 - // func (s *Server) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context) error 1206 - handleErr = s.handleComAtprotoSyncNotifyOfUpdate(ctx) 1239 + // func (s *Server) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context,hostname string) error 1240 + handleErr = s.handleComAtprotoSyncNotifyOfUpdate(ctx, hostname) 1207 1241 if handleErr != nil { 1208 1242 return handleErr 1209 1243 } ··· 1213 1247 func (s *Server) HandleComAtprotoSyncRequestCrawl(c echo.Context) error { 1214 1248 ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncRequestCrawl") 1215 1249 defer span.End() 1216 - host := c.QueryParam("host") 1250 + hostname := c.QueryParam("hostname") 1217 1251 var handleErr error 1218 - // func (s *Server) handleComAtprotoSyncRequestCrawl(ctx context.Context,host string) error 1219 - handleErr = s.handleComAtprotoSyncRequestCrawl(ctx, host) 1252 + // func (s *Server) handleComAtprotoSyncRequestCrawl(ctx context.Context,hostname string) error 1253 + handleErr = s.handleComAtprotoSyncRequestCrawl(ctx, hostname) 1220 1254 if handleErr != nil { 1221 1255 return handleErr 1222 1256 }
+25 -10
repomgr/repomgr.go
··· 174 174 return nil 175 175 } 176 176 177 + func (rm *RepoManager) CarStore() *carstore.CarStore { 178 + return rm.cs 179 + } 180 + 177 181 func (rm *RepoManager) CreateRecord(ctx context.Context, user util.Uid, collection string, rec cbg.CBORMarshaler) (string, cid.Cid, error) { 178 182 ctx, span := otel.Tracer("repoman").Start(ctx, "CreateRecord") 179 183 defer span.End() ··· 535 539 536 540 r, err := repo.OpenRepo(ctx, ds, root, true) 537 541 if err != nil { 538 - return fmt.Errorf("opening external user repo (%d): %w", uid, err) 542 + return fmt.Errorf("opening external user repo (%d, root=%s): %w", uid, root, err) 539 543 } 540 544 541 545 repoDid := r.RepoDid() ··· 967 971 968 972 var commits []cid.Cid 969 973 for head != nil && *head != until { 970 - 971 974 commits = append(commits, *head) 972 975 rep, err := repo.OpenRepo(ctx, membs, *head, true) 973 976 if err != nil { 974 - return fmt.Errorf("opening repo for backwalk (%d commits, until: %s, head: %s): %w", len(commits), until, *head, err) 977 + return fmt.Errorf("opening repo for backwalk (%d commits, until: %s, head: %s, carRoot: %s): %w", len(commits), until, *head, carr.Header.Roots[0], err) 975 978 } 976 979 977 980 prev, err := rep.PrevCommit(ctx) ··· 982 985 head = prev 983 986 } 984 987 988 + if until.Defined() && (head == nil || *head != until) { 989 + // TODO: this shouldnt be happening, but i've seen some log messages 990 + // suggest that it might. Leaving this here to discover any cases where 991 + // it does. 992 + log.Errorw("reached end of walkback without finding our 'until' commit", 993 + "until", until, 994 + "root", carr.Header.Roots[0], 995 + "commits", len(commits), 996 + "head", head, 997 + ) 998 + } 999 + 985 1000 // now we need to generate repo slices for each commit 986 1001 987 1002 seen := make(map[cid.Cid]bool) ··· 990 1005 seen[until] = true 991 1006 } 992 1007 993 - var baseBs blockstore.Blockstore 1008 + cbs := membs 994 1009 if until.Defined() { 995 1010 bs, err := rm.cs.ReadOnlySession(user) 996 1011 if err != nil { 997 1012 return err 998 1013 } 999 1014 1000 - baseBs = bs 1015 + // TODO: we technically only need this for the 'next' commit to diff against our current head. 1016 + cbs = util.NewReadThroughBstore(bs, membs) 1001 1017 } 1002 1018 1003 1019 prev := until 1004 1020 for i := len(commits) - 1; i >= 0; i-- { 1005 1021 root := commits[i] 1022 + // TODO: if there are blocks that get convergently recreated throughout 1023 + // the repos lifecycle, this will end up erroneously not including 1024 + // them. We should compute the set of blocks needed to read any repo 1025 + // ops that happened in the commit and use that for our 'output' blocks 1006 1026 cids, err := walkTree(ctx, seen, root, membs, true) 1007 1027 if err != nil { 1008 1028 return fmt.Errorf("walkTree: %w", err) ··· 1030 1050 1031 1051 finish := func(ctx context.Context) ([]byte, error) { 1032 1052 return ds.CloseWithRoot(ctx, root) 1033 - } 1034 - 1035 - cbs := membs 1036 - if i == len(commits)-1 { 1037 - cbs = util.NewReadThroughBstore(baseBs, membs) 1038 1053 } 1039 1054 1040 1055 if err := cb(ctx, prev, root, finish, cbs); err != nil {
+1 -1
testing/utils.go
··· 411 411 } 412 412 }) 413 413 414 - b, err := bgs.NewBGS(maindb, ix, repoman, evtman, didr, false) 414 + b, err := bgs.NewBGS(maindb, ix, repoman, evtman, didr, nil, false) 415 415 if err != nil { 416 416 return nil, err 417 417 }