···431431 return nil
432432 }
433433434434- // TODO: if the user is already in the 'slow' path, we shouldnt even bother trying to fast path this event
434434+ // skip the fast path for rebases or if the user is already in the slow path
435435+ if evt.Rebase || bgs.Index.Crawler.RepoInSlowPath(ctx, host, u.ID) {
436436+ ai, err := bgs.Index.LookupUser(ctx, u.ID)
437437+ if err != nil {
438438+ return err
439439+ }
440440+441441+ return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
442442+ }
435443436444 if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, (*cid.Cid)(evt.Prev), evt.Blocks); err != nil {
437445 log.Warnw("failed handling event", "err", err, "host", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())
+90-14
carstore/bs.go
···7070 Seq int `gorm:"index"`
7171 Path string
7272 Usr util.Uid `gorm:"index"`
7373+ Rebase bool
7374}
74757576type blockRef struct {
···344345 if earlyCid.Defined() {
345346 var untilShard CarShard
346347 if err := cs.meta.First(&untilShard, "root = ? AND usr = ?", util.DbCID{earlyCid}, user).Error; err != nil {
347347- return err
348348+ return fmt.Errorf("finding early shard: %w", err)
348349 }
349350 earlySeq = untilShard.Seq
350351 }
···352353 if lateCid.Defined() {
353354 var fromShard CarShard
354355 if err := cs.meta.First(&fromShard, "root = ? AND usr = ?", util.DbCID{lateCid}, user).Error; err != nil {
355355- return err
356356+ return fmt.Errorf("finding late shard: %w", err)
356357 }
357358 lateSeq = fromShard.Seq
358359 }
···384385 }
385386386387 for _, sh := range shards {
387387- if err := cs.writeShardBlocks(ctx, &sh, w); err != nil {
388388- return err
388388+ // for rebase shards, only include the modified root, not the whole tree
389389+ if sh.Rebase && incremental {
390390+ if err := cs.writeBlockFromShard(ctx, &sh, w, sh.Root.CID); err != nil {
391391+ return err
392392+ }
393393+ } else {
394394+ if err := cs.writeShardBlocks(ctx, &sh, w); err != nil {
395395+ return err
396396+ }
389397 }
390398 }
391399···415423 return nil
416424}
417425426426+func (cs *CarStore) writeBlockFromShard(ctx context.Context, sh *CarShard, w io.Writer, c cid.Cid) error {
427427+ fi, err := os.Open(sh.Path)
428428+ if err != nil {
429429+ return err
430430+ }
431431+ defer fi.Close()
432432+433433+ rr, err := car.NewCarReader(fi)
434434+ if err != nil {
435435+ return err
436436+ }
437437+438438+ for {
439439+ blk, err := rr.Next()
440440+ if err != nil {
441441+ return err
442442+ }
443443+444444+ if blk.Cid() == c {
445445+ _, err := LdWrite(w, c.Bytes(), blk.RawData())
446446+ return err
447447+ }
448448+ }
449449+}
450450+418451var _ blockstore.Blockstore = (*DeltaSession)(nil)
419452420453func (ds *DeltaSession) Put(ctx context.Context, b blockformat.Block) error {
···515548// CloseWithRoot writes all new blocks in a car file to the writer with the
516549// given cid as the 'root'
517550func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid) ([]byte, error) {
518518- ctx, span := otel.Tracer("carstore").Start(ctx, "CloseWithRoot")
519519- defer span.End()
551551+ return ds.closeWithRoot(ctx, root, false)
552552+}
520553521521- if ds.readonly {
522522- return nil, fmt.Errorf("cannot write to readonly deltaSession")
523523- }
524524-525525- buf := new(bytes.Buffer)
554554+func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) {
526555 h := &car.CarHeader{
527556 Roots: []cid.Cid{root},
528557 Version: 1,
529558 }
530559 hb, err := cbor.DumpObject(h)
531560 if err != nil {
532532- return nil, err
561561+ return 0, err
533562 }
534563535535- hnw, err := LdWrite(buf, hb)
564564+ hnw, err := LdWrite(w, hb)
565565+ if err != nil {
566566+ return 0, err
567567+ }
568568+569569+ return hnw, nil
570570+}
571571+572572+func (ds *DeltaSession) closeWithRoot(ctx context.Context, root cid.Cid, rebase bool) ([]byte, error) {
573573+ ctx, span := otel.Tracer("carstore").Start(ctx, "CloseWithRoot")
574574+ defer span.End()
575575+576576+ if ds.readonly {
577577+ return nil, fmt.Errorf("cannot write to readonly deltaSession")
578578+ }
579579+580580+ buf := new(bytes.Buffer)
581581+ hnw, err := WriteCarHeader(buf, root)
536582 if err != nil {
537583 return nil, err
538584 }
···540586 // TODO: writing these blocks in map traversal order is bad, I believe the
541587 // optimal ordering will be something like reverse-write-order, but random
542588 // is definitely not it
543543- var offset int64 = hnw
589589+590590+ offset := hnw
544591 //brefs := make([]*blockRef, 0, len(ds.blks))
545592 brefs := make([]map[string]interface{}, 0, len(ds.blks))
546593 for k, blk := range ds.blks {
···602649 }
603650604651 return buf.Bytes(), nil
652652+}
653653+654654+func (ds *DeltaSession) CloseAsRebase(ctx context.Context, root cid.Cid) error {
655655+ _, err := ds.closeWithRoot(ctx, root, true)
656656+ if err != nil {
657657+ return err
658658+ }
659659+660660+ // TODO: this *could* get large, might be worth doing it incrementally
661661+ var oldslices []CarShard
662662+ if err := ds.cs.meta.Find(&oldslices, "usr = ? AND seq < ?", ds.user, ds.seq).Error; err != nil {
663663+ return err
664664+ }
665665+666666+ // If anything here fails, cleanup is straightforward. Simply look for any
667667+ // shard in the database with a higher seq shard marked as 'rebase'
668668+ for _, sl := range oldslices {
669669+ if err := os.Remove(sl.Path); err != nil {
670670+ if !os.IsNotExist(err) {
671671+ return err
672672+ }
673673+ }
674674+675675+ if err := ds.cs.meta.Delete(&sl).Error; err != nil {
676676+ return err
677677+ }
678678+ }
679679+680680+ return nil
605681}
606682607683func LdWrite(w io.Writer, d ...[]byte) (int64, error) {
+18-6
events/dbpersist.go
···2828 Commit util.DbCID
2929 Prev *util.DbCID
30303131- Time time.Time
3232- Blobs []byte
3333- Repo util.Uid
3434- Event string
3131+ Time time.Time
3232+ Blobs []byte
3333+ Repo util.Uid
3434+ Event string
3535+ Rebase bool
35363637 Ops []RepoOpRecord
3738}
···103104 Event: "repo_append", // TODO: refactor to "#commit"? can "rebase" come through this path?
104105 Blobs: blobs,
105106 Time: t,
107107+ Rebase: evt.Rebase,
106108 }
107109108110 for _, op := range evt.Ops {
···210212 Prev: prevCID,
211213 Time: rer.Time.Format(util.ISO8601),
212214 Blobs: blobCIDs,
215215+ Rebase: rer.Rebase,
213216 // TODO: there was previously an Event field here. are these all Commit, or are some other events?
214217 }
215218···229232230233 cs, err := p.readCarSlice(ctx, rer)
231234 if err != nil {
232232- return nil, fmt.Errorf("read car slice: %w", err)
235235+ return nil, fmt.Errorf("read car slice (%s): %w", rer.Commit.CID, err)
233236 }
234237235238 if len(cs) > carstore.MaxSliceLength {
···244247func (p *DbPersistence) readCarSlice(ctx context.Context, rer *RepoEventRecord) ([]byte, error) {
245248246249 var early cid.Cid
247247- if rer.Prev != nil {
250250+ if rer.Prev != nil && !rer.Rebase {
248251 early = rer.Prev.CID
249252 }
250253···257260}
258261259262func (p *DbPersistence) TakeDownRepo(ctx context.Context, usr util.Uid) error {
263263+ return p.deleteAllEventsForUser(ctx, usr)
264264+}
265265+266266+func (p *DbPersistence) deleteAllEventsForUser(ctx context.Context, usr util.Uid) error {
260267 for {
261268 q := p.db.Model(&RepoEventRecord{}).Where("repo = ?", usr).Limit(100).Select("seq")
262269 res := p.db.Where("repo_event_record_id in (?)", q).Delete(&RepoOpRecord{})
···275282276283 return nil
277284}
285285+286286+func (p *DbPersistence) RebaseRepoEvents(ctx context.Context, usr util.Uid) error {
287287+ // a little weird that this is the same action as a takedown
288288+ return p.deleteAllEventsForUser(ctx, usr)
289289+}
+4
events/events.go
···221221func (em *EventManager) TakeDownRepo(ctx context.Context, user util.Uid) error {
222222 return em.persister.TakeDownRepo(ctx, user)
223223}
224224+225225+func (em *EventManager) HandleRebase(ctx context.Context, user util.Uid) error {
226226+ return em.persister.RebaseRepoEvents(ctx, user)
227227+}
+5
events/persist.go
···1313 Persist(ctx context.Context, e *XRPCStreamEvent) error
1414 Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
1515 TakeDownRepo(ctx context.Context, usr util.Uid) error
1616+ RebaseRepoEvents(ctx context.Context, usr util.Uid) error
1617}
17181819// MemPersister is the most naive implementation of event persistence
···7374func (mp *MemPersister) TakeDownRepo(ctx context.Context, uid util.Uid) error {
7475 return fmt.Errorf("repo takedowns not currently supported by memory persister, test usage only")
7576}
7777+7878+func (mp *MemPersister) RebaseRepoEvents(ctx context.Context, usr util.Uid) error {
7979+ return fmt.Errorf("repo rebases not currently supported by memory persister, test usage only")
8080+}
+48-14
indexer/crawler.go
···33import (
44 "context"
55 "fmt"
66+ "sync"
6778 comatproto "github.com/bluesky-social/indigo/api/atproto"
89 "github.com/bluesky-social/indigo/models"
···20212122 complete chan util.Uid
22232424+ maplk sync.Mutex
2525+ todo map[util.Uid]*crawlWork
2626+ inProgress map[util.Uid]*crawlWork
2727+2328 doRepoCrawl func(context.Context, *crawlWork) error
24292530 concurrency int
···3742 catchup: make(chan *catchupJob),
3843 doRepoCrawl: repoFn,
3944 concurrency: concurrency,
4545+ todo: make(map[util.Uid]*crawlWork),
4646+ inProgress: make(map[util.Uid]*crawlWork),
4047 }, nil
4148}
4249···62696370 // for events that come in while this actor is being processed
6471 next []*catchupJob
7272+7373+ rebase *catchupJob
6574}
66756776func (c *CrawlDispatcher) mainLoop() {
6877 var next *crawlWork
6978 var buffer []*crawlWork
7070-7171- todo := make(map[util.Uid]*crawlWork)
7272- inProgress := make(map[util.Uid]*crawlWork)
73797480 var rs chan *crawlWork
7581 for {
···7783 case act := <-c.ingest:
7884 // TODO: max buffer size
79858080- _, ok := inProgress[act.Uid]
8686+ c.maplk.Lock()
8787+ _, ok := c.inProgress[act.Uid]
8188 if ok {
8989+ c.maplk.Unlock()
8290 break
8391 }
84928585- _, has := todo[act.Uid]
9393+ _, has := c.todo[act.Uid]
8694 if has {
9595+ c.maplk.Unlock()
8796 break
8897 }
8998···91100 act: act,
92101 initScrape: true,
93102 }
9494- todo[act.Uid] = cw
103103+ c.todo[act.Uid] = cw
104104+ c.maplk.Unlock()
9510596106 if next == nil {
97107 next = cw
···100110 buffer = append(buffer, cw)
101111 }
102112 case rs <- next:
103103- delete(todo, next.act.Uid)
104104- inProgress[next.act.Uid] = next
113113+ c.maplk.Lock()
114114+ delete(c.todo, next.act.Uid)
115115+ c.inProgress[next.act.Uid] = next
116116+ c.maplk.Unlock()
105117106118 if len(buffer) > 0 {
107119 next = buffer[0]
···111123 rs = nil
112124 }
113125 case catchup := <-c.catchup:
114114- job, ok := todo[catchup.user.Uid]
126126+ c.maplk.Lock()
127127+ job, ok := c.todo[catchup.user.Uid]
128128+ // TODO: in the event of receiving a rebase event, we *could* pre-empt all other pending events
115129 if ok {
116130 job.catchup = append(job.catchup, catchup)
131131+ c.maplk.Unlock()
117132 break
118133 }
119134120120- job, ok = inProgress[catchup.user.Uid]
135135+ job, ok = c.inProgress[catchup.user.Uid]
121136 if ok {
122137 job.next = append(job.next, catchup)
138138+ c.maplk.Unlock()
123139 break
124140 }
125141···127143 act: catchup.user,
128144 catchup: []*catchupJob{catchup},
129145 }
130130- todo[catchup.user.Uid] = cw
146146+ c.todo[catchup.user.Uid] = cw
147147+ c.maplk.Unlock()
131148132149 if next == nil {
133150 next = cw
···137154 }
138155139156 case uid := <-c.complete:
140140- job, ok := inProgress[uid]
157157+ c.maplk.Lock()
158158+ job, ok := c.inProgress[uid]
141159 if !ok {
142160 panic("should not be possible to not have a job in progress we receive a completion signal for")
143161 }
144144- delete(inProgress, uid)
162162+ delete(c.inProgress, uid)
145163146164 if len(job.next) > 0 {
147147- todo[uid] = job
165165+ c.todo[uid] = job
148166 job.initScrape = false
149167 job.catchup = job.next
150168 job.next = nil
···156174 }
157175 }
158176177177+ c.maplk.Unlock()
178178+159179 }
160180 }
161181}
···206226 return ctx.Err()
207227 }
208228}
229229+230230+func (c *CrawlDispatcher) RepoInSlowPath(ctx context.Context, host *models.PDS, uid util.Uid) bool {
231231+ c.maplk.Lock()
232232+ defer c.maplk.Unlock()
233233+ if _, ok := c.todo[uid]; ok {
234234+ return true
235235+ }
236236+237237+ if _, ok := c.inProgress[uid]; ok {
238238+ return true
239239+ }
240240+241241+ return false
242242+}
+27
indexer/indexer.go
···112112113113 }
114114115115+ if evt.Rebase {
116116+ if err := ix.events.HandleRebase(ctx, evt.User); err != nil {
117117+ log.Errorf("failed to handle rebase in events manager: %s", err)
118118+ }
119119+ }
120120+115121 log.Debugw("Sending event", "did", did)
116122 if err := ix.events.AddEvent(ctx, &events.XRPCStreamEvent{
117123 RepoCommit: &comatproto.SyncSubscribeRepos_Commit{
···122128 Time: time.Now().Format(util.ISO8601),
123129 Ops: outops,
124130 TooBig: toobig,
131131+ Rebase: evt.Rebase,
125132 },
126133 PrivUid: evt.User,
127134 }); err != nil {
···822829 curHead, err := ix.repomgr.GetRepoRoot(ctx, ai.Uid)
823830 if err != nil && !isNotFound(err) {
824831 return err
832832+ }
833833+834834+ var rebase *comatproto.SyncSubscribeRepos_Commit
835835+ for _, job := range job.catchup {
836836+ if job.evt.Rebase {
837837+ rebase = job.evt
838838+ break
839839+ }
840840+ }
841841+ if rebase == nil {
842842+ for _, job := range job.next {
843843+ if job.evt.Rebase {
844844+ rebase = job.evt
845845+ break
846846+ }
847847+ }
848848+ }
849849+850850+ if rebase != nil {
851851+ return ix.repomgr.HandleRebase(ctx, ai.PDS, ai.Uid, ai.Did, (*cid.Cid)(rebase.Prev), (cid.Cid)(rebase.Commit), rebase.Blocks)
825852 }
826853827854 var host string
+6-1
pds/handlers.go
···792792}
793793794794func (s *Server) handleComAtprotoRepoRebaseRepo(ctx context.Context, body *comatprototypes.RepoRebaseRepo_Input) error {
795795- panic("nyi")
795795+ u, err := s.getUser(ctx)
796796+ if err != nil {
797797+ return err
798798+ }
799799+800800+ return s.repoman.DoRebase(ctx, u.ID)
796801}
+45
repo/repo.go
···160160 return r.sc.Prev, nil
161161}
162162163163+func (r *Repo) DataCid() cid.Cid {
164164+ return r.sc.Data
165165+}
166166+163167func (r *Repo) SignedCommit() SignedCommit {
164168 return r.sc
165169}
···235239236240 r.mst = nmst
237241 return nil
242242+}
243243+244244+// truncates history while retaining the same data root
245245+func (r *Repo) Truncate() {
246246+ r.sc.Prev = nil
247247+ r.repoCid = cid.Undef
238248}
239249240250// creates and writes a new SignedCommit for this repo, with `prev` pointing to old value
···381391382392 return mst.DiffTrees(ctx, r.bs, oldTree, curptr)
383393}
394394+395395+func (r *Repo) CopyDataTo(ctx context.Context, bs blockstore.Blockstore) error {
396396+ return copyRecCbor(ctx, r.bs, bs, r.sc.Data, make(map[cid.Cid]struct{}))
397397+}
398398+399399+func copyRecCbor(ctx context.Context, from, to blockstore.Blockstore, c cid.Cid, seen map[cid.Cid]struct{}) error {
400400+ if _, ok := seen[c]; ok {
401401+ return nil
402402+ }
403403+ seen[c] = struct{}{}
404404+405405+ blk, err := from.Get(ctx, c)
406406+ if err != nil {
407407+ return err
408408+ }
409409+410410+ if err := to.Put(ctx, blk); err != nil {
411411+ return err
412412+ }
413413+414414+ var out []cid.Cid
415415+ if err := cbg.ScanForLinks(bytes.NewReader(blk.RawData()), func(c cid.Cid) {
416416+ out = append(out, c)
417417+ }); err != nil {
418418+ return err
419419+ }
420420+421421+ for _, child := range out {
422422+ if err := copyRecCbor(ctx, from, to, child, seen); err != nil {
423423+ return err
424424+ }
425425+ }
426426+427427+ return nil
428428+}