this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

events: add sync event support to persist implementations (#1020)

Adds basic support for the new `#sync` event to persister
implementations. Note that the pebble persister (used in rainbow) is
agnostic to event type, so didn't need any update.

This PR is a bit drive-by: the new relay work doesn't use this copy of
the persist code, and these patches haven't been demonstrated to work.

authored by

bnewbold and committed by
GitHub
1c879a77 2c892757

+74
+54
events/dbpersist/dbpersist.go
··· 171 171 switch { 172 172 case e.RepoCommit != nil: 173 173 e.RepoCommit.Seq = int64(item.Seq) 174 + case e.RepoSync != nil: 175 + e.RepoSync.Seq = int64(item.Seq) 174 176 case e.RepoHandle != nil: 175 177 e.RepoHandle.Seq = int64(item.Seq) 176 178 case e.RepoIdentity != nil: ··· 215 217 switch { 216 218 case e.RepoCommit != nil: 217 219 rer, err = p.RecordFromRepoCommit(ctx, e.RepoCommit) 220 + if err != nil { 221 + return err 222 + } 223 + case e.RepoSync != nil: 224 + rer, err = p.RecordFromRepoSync(ctx, e.RepoSync) 218 225 if err != nil { 219 226 return err 220 227 } ··· 371 378 return &rer, nil 372 379 } 373 380 381 + func (p *DbPersistence) RecordFromRepoSync(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Sync) (*RepoEventRecord, error) { 382 + 383 + uid, err := p.uidForDid(ctx, evt.Did) 384 + if err != nil { 385 + return nil, err 386 + } 387 + 388 + t, err := time.Parse(util.ISO8601, evt.Time) 389 + if err != nil { 390 + return nil, err 391 + } 392 + 393 + rer := RepoEventRecord{ 394 + Repo: uid, 395 + Type: "repo_sync", 396 + Time: t, 397 + Rev: evt.Rev, 398 + } 399 + 400 + return &rer, nil 401 + } 402 + 374 403 func (p *DbPersistence) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { 375 404 pageSize := 1000 376 405 ··· 449 478 switch { 450 479 case record.Commit != nil: 451 480 streamEvent, err = p.hydrateCommit(ctx, record) 481 + case record.Type == "repo_sync": 482 + streamEvent, err = p.hydrateSyncEvent(ctx, record) 452 483 case record.NewHandle != nil: 453 484 streamEvent, err = p.hydrateHandleChange(ctx, record) 454 485 case record.Type == "repo_identity": ··· 637 668 } 638 669 639 670 return &events.XRPCStreamEvent{RepoCommit: out}, nil 671 + } 672 + 673 + func (p *DbPersistence) hydrateSyncEvent(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) { 674 + 675 + did, err := p.didForUid(ctx, rer.Repo) 676 + if err != nil { 677 + return nil, err 678 + } 679 + 680 + evt := &comatproto.SyncSubscribeRepos_Sync{ 681 + Seq: int64(rer.Seq), 682 + Did: did, 683 + Time: rer.Time.Format(util.ISO8601), 684 + Rev: rer.Rev, 685 + } 686 + 687 + cs, err := p.readCarSlice(ctx, rer) 688 + if err != nil { 689 + return nil, fmt.Errorf("read car slice: %w", err) 690 + } 691 + evt.Blocks = cs 692 + 693 + return &events.XRPCStreamEvent{RepoSync: evt}, nil 640 694 } 641 695 642 696 func (p *DbPersistence) readCarSlice(ctx context.Context, rer *RepoEventRecord) ([]byte, error) {
+18
events/diskpersist/diskpersist.go
··· 282 282 evtKindTombstone = 3 283 283 evtKindIdentity = 4 284 284 evtKindAccount = 5 285 + evtKindSync = 6 285 286 ) 286 287 287 288 var emptyHeader = make([]byte, headerSize) ··· 455 456 switch { 456 457 case e.RepoCommit != nil: 457 458 e.RepoCommit.Seq = seq 459 + case e.RepoSync != nil: 460 + e.RepoSync.Seq = seq 458 461 case e.RepoHandle != nil: 459 462 e.RepoHandle.Seq = seq 460 463 case e.RepoIdentity != nil: ··· 507 510 evtKind = evtKindCommit 508 511 did = e.RepoCommit.Repo 509 512 if err := e.RepoCommit.MarshalCBOR(cw); err != nil { 513 + return fmt.Errorf("failed to marshal: %w", err) 514 + } 515 + case e.RepoSync != nil: 516 + evtKind = evtKindSync 517 + did = e.RepoSync.Did 518 + if err := e.RepoSync.MarshalCBOR(cw); err != nil { 510 519 return fmt.Errorf("failed to marshal: %w", err) 511 520 } 512 521 case e.RepoHandle != nil: ··· 743 752 } 744 753 evt.Seq = h.Seq 745 754 if err := cb(&events.XRPCStreamEvent{RepoCommit: &evt}); err != nil { 755 + return nil, err 756 + } 757 + case evtKindSync: 758 + var evt atproto.SyncSubscribeRepos_Sync 759 + if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { 760 + return nil, err 761 + } 762 + evt.Seq = h.Seq 763 + if err := cb(&events.XRPCStreamEvent{RepoSync: &evt}); err != nil { 746 764 return nil, err 747 765 } 748 766 case evtKindHandle:
+2
events/yolopersist/yolopersist.go
··· 28 28 switch { 29 29 case e.RepoCommit != nil: 30 30 e.RepoCommit.Seq = yp.seq 31 + case e.RepoSync != nil: 32 + e.RepoSync.Seq = yp.seq 31 33 case e.RepoHandle != nil: 32 34 e.RepoHandle.Seq = yp.seq 33 35 case e.RepoIdentity != nil: