this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

make it optional to hydrate records in internal events (#397)

authored by

Whyrusleeping and committed by
GitHub
ec2345f1 5a338fa2

+97 -63
+1 -1
bgs/compactor.go
··· 334 334 ) 335 335 336 336 if shardCount == 0 { 337 - shardCount = 50 337 + shardCount = 20 338 338 } 339 339 340 340 span.SetAttributes(attribute.Int("clampedShardCount", shardCount))
+1 -1
cmd/bigsky/main.go
··· 307 307 if err := ix.HandleRepoEvent(ctx, evt); err != nil { 308 308 log.Errorw("failed to handle repo event", "err", err) 309 309 } 310 - }) 310 + }, false) 311 311 312 312 var blobstore blobs.BlobStore 313 313 if bsdir := cctx.String("disk-blob-store"); bsdir != "" {
+1 -1
cmd/supercollider/main.go
··· 272 272 TotalDesiredEvents: cctx.Int("total-events"), 273 273 } 274 274 275 - repoman.SetEventHandler(s.HandleRepoEvent) 275 + repoman.SetEventHandler(s.HandleRepoEvent, false) 276 276 277 277 // HTTP Server setup and Middleware Plumbing 278 278 e := echo.New()
+1 -1
pds/server.go
··· 104 104 if err := ix.HandleRepoEvent(ctx, evt); err != nil { 105 105 log.Errorw("handle repo event failed", "user", evt.User, "err", err) 106 106 } 107 - }) 107 + }, true) 108 108 109 109 //ix.SendRemoteFollow = s.sendRemoteFollow 110 110 ix.CreateExternalUser = s.createExternalUser
+92 -58
repomgr/repomgr.go
··· 45 45 SignForUser(context.Context, string, []byte) ([]byte, error) 46 46 } 47 47 48 - func (rm *RepoManager) SetEventHandler(cb func(context.Context, *RepoEvent)) { 48 + func (rm *RepoManager) SetEventHandler(cb func(context.Context, *RepoEvent), hydrateRecords bool) { 49 49 rm.events = cb 50 + rm.hydrateRecords = hydrateRecords 50 51 } 51 52 52 53 type RepoManager struct { ··· 56 57 lklk sync.Mutex 57 58 userLocks map[models.Uid]*userLock 58 59 59 - events func(context.Context, *RepoEvent) 60 + events func(context.Context, *RepoEvent) 61 + hydrateRecords bool 60 62 } 61 63 62 64 type ActorInfo struct { ··· 250 252 } 251 253 252 254 if rm.events != nil { 255 + op := RepoOp{ 256 + Kind: EvtKindUpdateRecord, 257 + Collection: collection, 258 + Rkey: rkey, 259 + RecCid: &cc, 260 + } 261 + 262 + if rm.hydrateRecords { 263 + op.Record = rec 264 + } 265 + 253 266 rm.events(ctx, &RepoEvent{ 254 - User: user, 255 - OldRoot: oldroot, 256 - NewRoot: nroot, 257 - Rev: nrev, 258 - Since: &rev, 259 - Ops: []RepoOp{{ 260 - Kind: EvtKindUpdateRecord, 261 - Collection: collection, 262 - Rkey: rkey, 263 - Record: rec, 264 - RecCid: &cc, 265 - }}, 267 + User: user, 268 + OldRoot: oldroot, 269 + NewRoot: nroot, 270 + Rev: nrev, 271 + Since: &rev, 272 + Ops: []RepoOp{op}, 266 273 RepoSlice: rslice, 267 274 }) 268 275 } ··· 372 379 } 373 380 374 381 if rm.events != nil { 382 + op := RepoOp{ 383 + Kind: EvtKindCreateRecord, 384 + Collection: "app.bsky.actor.profile", 385 + Rkey: "self", 386 + } 387 + 388 + if rm.hydrateRecords { 389 + op.Record = profile 390 + } 391 + 375 392 rm.events(ctx, &RepoEvent{ 376 - User: user, 377 - NewRoot: root, 378 - Rev: nrev, 379 - Ops: []RepoOp{{ 380 - Kind: EvtKindCreateRecord, 381 - Collection: "app.bsky.actor.profile", 382 - Rkey: "self", 383 - Record: profile, 384 - }}, 393 + User: user, 394 + NewRoot: root, 395 + Rev: nrev, 396 + Ops: []RepoOp{op}, 385 397 RepoSlice: rslice, 386 398 }) 387 399 } ··· 522 534 523 535 switch EventKind(op.Action) { 524 536 case EvtKindCreateRecord: 525 - recid, rec, err := r.GetRecord(ctx, op.Path) 526 - if err != nil { 527 - return fmt.Errorf("reading changed record from car slice: %w", err) 528 - } 529 - 530 - evtops = append(evtops, RepoOp{ 537 + rop := RepoOp{ 531 538 Kind: EvtKindCreateRecord, 532 539 Collection: parts[0], 533 540 Rkey: parts[1], 534 - Record: rec, 535 - RecCid: &recid, 536 - }) 537 - case EvtKindUpdateRecord: 538 - recid, rec, err := r.GetRecord(ctx, op.Path) 539 - if err != nil { 540 - return fmt.Errorf("reading changed record from car slice: %w", err) 541 + RecCid: (*cid.Cid)(op.Cid), 541 542 } 542 543 543 - evtops = append(evtops, RepoOp{ 544 + if rm.hydrateRecords { 545 + _, rec, err := r.GetRecord(ctx, op.Path) 546 + if err != nil { 547 + return fmt.Errorf("reading changed record from car slice: %w", err) 548 + } 549 + rop.Record = rec 550 + } 551 + 552 + evtops = append(evtops, rop) 553 + case EvtKindUpdateRecord: 554 + rop := RepoOp{ 544 555 Kind: EvtKindUpdateRecord, 545 556 Collection: parts[0], 546 557 Rkey: parts[1], 547 - Record: rec, 548 - RecCid: &recid, 549 - }) 558 + RecCid: (*cid.Cid)(op.Cid), 559 + } 560 + 561 + if rm.hydrateRecords { 562 + _, rec, err := r.GetRecord(ctx, op.Path) 563 + if err != nil { 564 + return fmt.Errorf("reading changed record from car slice: %w", err) 565 + } 566 + 567 + rop.Record = rec 568 + } 569 + 570 + evtops = append(evtops, rop) 550 571 case EvtKindDeleteRecord: 551 572 evtops = append(evtops, RepoOp{ 552 573 Kind: EvtKindDeleteRecord, ··· 624 645 return err 625 646 } 626 647 627 - ops = append(ops, RepoOp{ 648 + op := RepoOp{ 628 649 Kind: EvtKindCreateRecord, 629 650 Collection: c.Collection, 630 651 Rkey: rkey, 631 652 RecCid: &cc, 632 - Record: c.Value.Val, 633 - }) 653 + } 654 + 655 + if rm.hydrateRecords { 656 + op.Record = c.Value.Val 657 + } 658 + 659 + ops = append(ops, op) 634 660 case w.RepoApplyWrites_Update != nil: 635 661 u := w.RepoApplyWrites_Update 636 662 ··· 639 665 return err 640 666 } 641 667 642 - ops = append(ops, RepoOp{ 668 + op := RepoOp{ 643 669 Kind: EvtKindUpdateRecord, 644 670 Collection: u.Collection, 645 671 Rkey: u.Rkey, 646 672 RecCid: &cc, 647 - Record: u.Value.Val, 648 - }) 673 + } 674 + 675 + if rm.hydrateRecords { 676 + op.Record = u.Value.Val 677 + } 678 + 679 + ops = append(ops, op) 649 680 case w.RepoApplyWrites_Delete != nil: 650 681 d := w.RepoApplyWrites_Delete 651 682 ··· 740 771 var ops []RepoOp 741 772 for _, op := range diffops { 742 773 repoOpsImported.Inc() 743 - out, err := processOp(ctx, bs, op) 774 + out, err := processOp(ctx, bs, op, rm.hydrateRecords) 744 775 if err != nil { 745 776 log.Errorw("failed to process repo op", "err", err, "path", op.Rpath, "repo", repoDid) 746 777 } ··· 776 807 return nil 777 808 } 778 809 779 - func processOp(ctx context.Context, bs blockstore.Blockstore, op *mst.DiffOp) (*RepoOp, error) { 810 + func processOp(ctx context.Context, bs blockstore.Blockstore, op *mst.DiffOp, hydrateRecords bool) (*RepoOp, error) { 780 811 parts := strings.SplitN(op.Rpath, "/", 2) 781 812 if len(parts) != 2 { 782 813 return nil, fmt.Errorf("repo mst had invalid rpath: %q", op.Rpath) ··· 784 815 785 816 switch op.Op { 786 817 case "add", "mut": 787 - blk, err := bs.Get(ctx, op.NewCid) 788 - if err != nil { 789 - return nil, err 790 - } 791 818 792 819 kind := EvtKindCreateRecord 793 820 if op.Op == "mut" { ··· 801 828 RecCid: &op.NewCid, 802 829 } 803 830 804 - rec, err := lexutil.CborDecodeValue(blk.RawData()) 805 - if err != nil { 806 - if !errors.Is(err, lexutil.ErrUnrecognizedType) { 831 + if hydrateRecords { 832 + blk, err := bs.Get(ctx, op.NewCid) 833 + if err != nil { 807 834 return nil, err 808 835 } 809 836 810 - log.Warnf("failed processing repo diff: %s", err) 811 - } else { 812 - outop.Record = rec 837 + rec, err := lexutil.CborDecodeValue(blk.RawData()) 838 + if err != nil { 839 + if !errors.Is(err, lexutil.ErrUnrecognizedType) { 840 + return nil, err 841 + } 842 + 843 + log.Warnf("failed processing repo diff: %s", err) 844 + } else { 845 + outop.Record = rec 846 + } 813 847 } 814 848 815 849 return outop, nil
+1 -1
testing/utils.go
··· 445 445 if err := ix.HandleRepoEvent(ctx, evt); err != nil { 446 446 fmt.Println("test bgs failed to handle repo event", err) 447 447 } 448 - }) 448 + }, true) // TODO: actually want this to be false, but some tests use this to confirm the BGS has seen certain records 449 449 450 450 tr := &api.TestHandleResolver{} 451 451