this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

indexer: rip out a bunch of record processing code

-539
-539
indexer/indexer.go
··· 9 9 "time" 10 10 11 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 - bsky "github.com/bluesky-social/indigo/api/bsky" 13 12 "github.com/bluesky-social/indigo/did" 14 13 "github.com/bluesky-social/indigo/events" 15 14 lexutil "github.com/bluesky-social/indigo/lex/util" ··· 18 17 "github.com/bluesky-social/indigo/util" 19 18 "github.com/bluesky-social/indigo/xrpc" 20 19 21 - "github.com/ipfs/go-cid" 22 20 "go.opentelemetry.io/otel" 23 21 "gorm.io/gorm" 24 22 "gorm.io/gorm/clause" ··· 98 96 Action: string(op.Kind), 99 97 Cid: link, 100 98 }) 101 - 102 - if err := ix.handleRepoOp(ctx, evt, &op); err != nil { 103 - ix.log.Error("failed to handle repo op", "err", err) 104 - } 105 99 } 106 100 107 101 did, err := ix.DidForUser(ctx, evt.User) ··· 137 131 return nil 138 132 } 139 133 140 - func (ix *Indexer) handleRepoOp(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error { 141 - switch op.Kind { 142 - case repomgr.EvtKindCreateRecord: 143 - if ix.doAggregations { 144 - _, err := ix.handleRecordCreate(ctx, evt, op, true) 145 - if err != nil { 146 - return fmt.Errorf("handle recordCreate: %w", err) 147 - } 148 - } 149 - if ix.doSpider { 150 - if err := ix.crawlRecordReferences(ctx, op); err != nil { 151 - return err 152 - } 153 - } 154 - case repomgr.EvtKindDeleteRecord: 155 - if ix.doAggregations { 156 - if err := ix.handleRecordDelete(ctx, evt, op, true); err != nil { 157 - return fmt.Errorf("handle recordDelete: %w", err) 158 - } 159 - } 160 - case repomgr.EvtKindUpdateRecord: 161 - if ix.doAggregations { 162 - if err := ix.handleRecordUpdate(ctx, evt, op, true); err != nil { 163 - return fmt.Errorf("handle recordCreate: %w", err) 164 - } 165 - } 166 - default: 167 - return fmt.Errorf("unrecognized repo event type: %q", op.Kind) 168 - } 169 - 170 - return nil 171 - } 172 - 173 - func (ix *Indexer) crawlAtUriRef(ctx context.Context, uri string) error { 174 - puri, err := util.ParseAtUri(uri) 175 - if err != nil { 176 - return err 177 - } 178 - 179 - referencesCrawled.Inc() 180 - 181 - _, err = ix.GetUserOrMissing(ctx, puri.Did) 182 - if err != nil { 183 - return err 184 - } 185 - return nil 186 - } 187 - func (ix *Indexer) crawlRecordReferences(ctx context.Context, op *repomgr.RepoOp) error { 188 - ctx, span := otel.Tracer("indexer").Start(ctx, "crawlRecordReferences") 189 - defer span.End() 190 - 191 - switch rec := op.Record.(type) { 192 - case *bsky.FeedPost: 193 - for _, e := range rec.Entities { 194 - if e.Type == "mention" { 195 - _, err := ix.GetUserOrMissing(ctx, e.Value) 196 - if err != nil { 197 - ix.log.Info("failed to parse user mention", "ref", e.Value, "err", err) 198 - } 199 - } 200 - } 201 - 202 - if rec.Reply != nil { 203 - if rec.Reply.Parent != nil { 204 - if err := ix.crawlAtUriRef(ctx, rec.Reply.Parent.Uri); err != nil { 205 - ix.log.Info("failed to crawl reply parent", "cid", op.RecCid, "replyuri", rec.Reply.Parent.Uri, "err", err) 206 - } 207 - } 208 - 209 - if rec.Reply.Root != nil { 210 - if err := ix.crawlAtUriRef(ctx, rec.Reply.Root.Uri); err != nil { 211 - ix.log.Info("failed to crawl reply root", "cid", op.RecCid, "rooturi", rec.Reply.Root.Uri, "err", err) 212 - } 213 - } 214 - } 215 - 216 - return nil 217 - case *bsky.FeedRepost: 218 - if rec.Subject != nil { 219 - if err := ix.crawlAtUriRef(ctx, rec.Subject.Uri); err != nil { 220 - ix.log.Info("failed to crawl repost subject", "cid", op.RecCid, "subjecturi", rec.Subject.Uri, "err", err) 221 - } 222 - } 223 - return nil 224 - case *bsky.FeedLike: 225 - if rec.Subject != nil { 226 - if err := ix.crawlAtUriRef(ctx, rec.Subject.Uri); err != nil { 227 - ix.log.Info("failed to crawl like subject", "cid", op.RecCid, "subjecturi", rec.Subject.Uri, "err", err) 228 - } 229 - } 230 - return nil 231 - case *bsky.GraphFollow: 232 - _, err := ix.GetUserOrMissing(ctx, rec.Subject) 233 - if err != nil { 234 - ix.log.Info("failed to crawl follow subject", "cid", op.RecCid, "subjectdid", rec.Subject, "err", err) 235 - } 236 - return nil 237 - case *bsky.GraphBlock: 238 - _, err := ix.GetUserOrMissing(ctx, rec.Subject) 239 - if err != nil { 240 - ix.log.Info("failed to crawl follow subject", "cid", op.RecCid, "subjectdid", rec.Subject, "err", err) 241 - } 242 - return nil 243 - case *bsky.ActorProfile: 244 - return nil 245 - case *bsky.GraphList: 246 - return nil 247 - case *bsky.GraphListitem: 248 - return nil 249 - case *bsky.FeedGenerator: 250 - return nil 251 - default: 252 - ix.log.Warn("unrecognized record type (crawling references)", "record", op.Record, "collection", op.Collection) 253 - return nil 254 - } 255 - } 256 - 257 134 func (ix *Indexer) GetUserOrMissing(ctx context.Context, did string) (*models.ActorInfo, error) { 258 135 ctx, span := otel.Tracer("indexer").Start(ctx, "getUserOrMissing") 259 136 defer span.End() ··· 390 267 391 268 return &post, nil 392 269 } 393 - 394 - func (ix *Indexer) handleRecordDelete(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp, local bool) error { 395 - ix.log.Debug("record delete event", "collection", op.Collection) 396 - 397 - switch op.Collection { 398 - case "app.bsky.feed.post": 399 - u, err := ix.LookupUser(ctx, evt.User) 400 - if err != nil { 401 - return err 402 - } 403 - 404 - uri := "at://" + u.Did + "/app.bsky.feed.post/" + op.Rkey 405 - 406 - // NB: currently not using the 'or missing' variant here. If we delete 407 - // something that we've never seen before, maybe just dont bother? 408 - fp, err := ix.GetPost(ctx, uri) 409 - if err != nil { 410 - if errors.Is(err, gorm.ErrRecordNotFound) { 411 - ix.log.Warn("deleting post weve never seen before. Weird.", "user", evt.User, "rkey", op.Rkey) 412 - return nil 413 - } 414 - return err 415 - } 416 - 417 - if err := ix.db.Model(models.FeedPost{}).Where("id = ?", fp.ID).UpdateColumn("deleted", true).Error; err != nil { 418 - return err 419 - } 420 - case "app.bsky.feed.repost": 421 - if err := ix.db.Where("reposter = ? AND rkey = ?", evt.User, op.Rkey).Delete(&models.RepostRecord{}).Error; err != nil { 422 - return err 423 - } 424 - case "app.bsky.feed.vote": 425 - return ix.handleRecordDeleteFeedLike(ctx, evt, op) 426 - case "app.bsky.graph.follow": 427 - return ix.handleRecordDeleteGraphFollow(ctx, evt, op) 428 - case "app.bsky.graph.confirmation": 429 - return nil 430 - default: 431 - return fmt.Errorf("unrecognized record type (delete): %q", op.Collection) 432 - } 433 - 434 - return nil 435 - } 436 - 437 - func (ix *Indexer) handleRecordDeleteFeedLike(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error { 438 - var vr models.VoteRecord 439 - if err := ix.db.Find(&vr, "voter = ? AND rkey = ?", evt.User, op.Rkey).Error; err != nil { 440 - return err 441 - } 442 - 443 - if err := ix.db.Transaction(func(tx *gorm.DB) error { 444 - tx.Statement.RaiseErrorOnNotFound = true 445 - if err := tx.Model(models.VoteRecord{}).Where("id = ?", vr.ID).Delete(&vr).Error; err != nil { 446 - return err 447 - } 448 - 449 - if err := tx.Model(models.FeedPost{}).Where("id = ?", vr.Post).Update("up_count", gorm.Expr("up_count - 1")).Error; err != nil { 450 - return err 451 - } 452 - 453 - return nil 454 - }); err != nil { 455 - return err 456 - } 457 - 458 - ix.log.Warn("need to delete vote notification") 459 - return nil 460 - } 461 - 462 - func (ix *Indexer) handleRecordDeleteGraphFollow(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error { 463 - q := ix.db.Where("follower = ? AND rkey = ?", evt.User, op.Rkey).Delete(&models.FollowRecord{}) 464 - if err := q.Error; err != nil { 465 - return err 466 - } 467 - 468 - if q.RowsAffected == 0 { 469 - ix.log.Warn("attempted to delete follow we did not have a record for", "user", evt.User, "rkey", op.Rkey) 470 - return nil 471 - } 472 - 473 - return nil 474 - } 475 - 476 - func (ix *Indexer) handleRecordCreate(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp, local bool) ([]uint, error) { 477 - ix.log.Debug("record create event", "collection", op.Collection) 478 - 479 - var out []uint 480 - switch rec := op.Record.(type) { 481 - case *bsky.FeedPost: 482 - if err := ix.handleRecordCreateFeedPost(ctx, evt.User, op.Rkey, *op.RecCid, rec); err != nil { 483 - return nil, err 484 - } 485 - case *bsky.FeedRepost: 486 - fp, err := ix.GetPostOrMissing(ctx, rec.Subject.Uri) 487 - if err != nil { 488 - return nil, err 489 - } 490 - 491 - author, err := ix.LookupUser(ctx, fp.Author) 492 - if err != nil { 493 - return nil, err 494 - } 495 - 496 - out = append(out, author.PDS) 497 - 498 - rr := models.RepostRecord{ 499 - RecCreated: rec.CreatedAt, 500 - Post: fp.ID, 501 - Reposter: evt.User, 502 - Author: fp.Author, 503 - RecCid: op.RecCid.String(), 504 - Rkey: op.Rkey, 505 - } 506 - if err := ix.db.Create(&rr).Error; err != nil { 507 - return nil, err 508 - } 509 - 510 - case *bsky.FeedLike: 511 - return nil, ix.handleRecordCreateFeedLike(ctx, rec, evt, op) 512 - case *bsky.GraphFollow: 513 - return out, ix.handleRecordCreateGraphFollow(ctx, rec, evt, op) 514 - case *bsky.GraphBlock: 515 - return out, nil 516 - case *bsky.GraphList: 517 - return out, nil 518 - case *bsky.GraphListitem: 519 - return out, nil 520 - case *bsky.FeedGenerator: 521 - return out, nil 522 - case *bsky.ActorProfile: 523 - ix.log.Debug("TODO: got actor profile record creation, need to do something with this") 524 - default: 525 - ix.log.Warn("unrecognized record", "record", op.Record, "collection", op.Collection) 526 - return nil, fmt.Errorf("unrecognized record type (creation): %s", op.Collection) 527 - } 528 - 529 - return out, nil 530 - } 531 - 532 - func (ix *Indexer) handleRecordCreateFeedLike(ctx context.Context, rec *bsky.FeedLike, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error { 533 - post, err := ix.GetPostOrMissing(ctx, rec.Subject.Uri) 534 - if err != nil { 535 - return err 536 - } 537 - 538 - act, err := ix.LookupUser(ctx, post.Author) 539 - if err != nil { 540 - return err 541 - } 542 - 543 - vr := models.VoteRecord{ 544 - Voter: evt.User, 545 - Post: post.ID, 546 - Created: rec.CreatedAt, 547 - Rkey: op.Rkey, 548 - Cid: op.RecCid.String(), 549 - } 550 - if err := ix.db.Create(&vr).Error; err != nil { 551 - return err 552 - } 553 - 554 - if err := ix.db.Model(models.FeedPost{}).Where("id = ?", post.ID).Update("up_count", gorm.Expr("up_count + 1")).Error; err != nil { 555 - return err 556 - } 557 - if err := ix.addNewVoteNotification(ctx, act.Uid, &vr); err != nil { 558 - return err 559 - } 560 - 561 - return nil 562 - } 563 - 564 - func (ix *Indexer) handleRecordCreateGraphFollow(ctx context.Context, rec *bsky.GraphFollow, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error { 565 - subj, err := ix.LookupUserByDid(ctx, rec.Subject) 566 - if err != nil { 567 - if !errors.Is(err, gorm.ErrRecordNotFound) { 568 - return fmt.Errorf("failed to lookup user: %w", err) 569 - } 570 - 571 - nu, err := ix.createMissingUserRecord(ctx, rec.Subject) 572 - if err != nil { 573 - return fmt.Errorf("create external user: %w", err) 574 - } 575 - 576 - subj = nu 577 - } 578 - 579 - // 'follower' followed 'target' 580 - fr := models.FollowRecord{ 581 - Follower: evt.User, 582 - Target: subj.Uid, 583 - Rkey: op.Rkey, 584 - Cid: op.RecCid.String(), 585 - } 586 - if err := ix.db.Create(&fr).Error; err != nil { 587 - return err 588 - } 589 - 590 - return nil 591 - } 592 - 593 - func (ix *Indexer) handleRecordUpdate(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp, local bool) error { 594 - ix.log.Debug("record update event", "collection", op.Collection) 595 - 596 - switch rec := op.Record.(type) { 597 - case *bsky.FeedPost: 598 - u, err := ix.LookupUser(ctx, evt.User) 599 - if err != nil { 600 - return err 601 - } 602 - 603 - uri := "at://" + u.Did + "/app.bsky.feed.post/" + op.Rkey 604 - fp, err := ix.GetPostOrMissing(ctx, uri) 605 - if err != nil { 606 - return err 607 - } 608 - 609 - oldReply := fp.ReplyTo != 0 610 - newReply := rec.Reply != nil 611 - 612 - if oldReply != newReply { 613 - // the 'replyness' of the post was changed... that's weird 614 - ix.log.Error("need to properly handle case where reply-ness of posts is changed") 615 - return nil 616 - } 617 - 618 - if newReply { 619 - replyto, err := ix.GetPostOrMissing(ctx, rec.Reply.Parent.Uri) 620 - if err != nil { 621 - return err 622 - } 623 - 624 - if replyto.ID != fp.ReplyTo { 625 - ix.log.Error("post was changed to be a reply to a different post") 626 - return nil 627 - } 628 - } 629 - 630 - if err := ix.db.Model(models.FeedPost{}).Where("id = ?", fp.ID).UpdateColumn("cid", op.RecCid.String()).Error; err != nil { 631 - return err 632 - } 633 - 634 - return nil 635 - case *bsky.FeedRepost: 636 - var rr models.RepostRecord 637 - if err := ix.db.First(&rr, "reposter = ? AND rkey = ?", evt.User, op.Rkey).Error; err != nil { 638 - return err 639 - } 640 - 641 - // TODO: check if the post changed and do something about that 642 - 643 - rr.RecCreated = rec.CreatedAt 644 - rr.RecCid = op.RecCid.String() 645 - 646 - if err := ix.db.Save(&rr).Error; err != nil { 647 - return err 648 - } 649 - 650 - case *bsky.FeedLike: 651 - var vr models.VoteRecord 652 - if err := ix.db.Find(&vr, "voted = ? AND rkey = ?", evt.User, op.Rkey).Error; err != nil { 653 - return err 654 - } 655 - 656 - fp, err := ix.GetPostOrMissing(ctx, rec.Subject.Uri) 657 - if err != nil { 658 - return err 659 - } 660 - 661 - if vr.Post != fp.ID { 662 - // vote is on a completely different post, delete old one, create new one 663 - if err := ix.handleRecordDeleteFeedLike(ctx, evt, op); err != nil { 664 - return err 665 - } 666 - 667 - return ix.handleRecordCreateFeedLike(ctx, rec, evt, op) 668 - } 669 - 670 - return ix.handleRecordCreateFeedLike(ctx, rec, evt, op) 671 - case *bsky.GraphFollow: 672 - if err := ix.handleRecordDeleteGraphFollow(ctx, evt, op); err != nil { 673 - return err 674 - } 675 - 676 - return ix.handleRecordCreateGraphFollow(ctx, rec, evt, op) 677 - case *bsky.ActorProfile: 678 - ix.log.Debug("TODO: got actor profile record update, need to do something with this") 679 - default: 680 - return fmt.Errorf("unrecognized record type (update): %s", op.Collection) 681 - } 682 - 683 - return nil 684 - } 685 - 686 - func (ix *Indexer) GetPostOrMissing(ctx context.Context, uri string) (*models.FeedPost, error) { 687 - puri, err := util.ParseAtUri(uri) 688 - if err != nil { 689 - return nil, err 690 - } 691 - 692 - var post models.FeedPost 693 - if err := ix.db.Find(&post, "rkey = ? AND author = (?)", puri.Rkey, ix.db.Model(models.ActorInfo{}).Where("did = ?", puri.Did).Select("id")).Error; err != nil { 694 - return nil, err 695 - } 696 - 697 - if post.ID == 0 { 698 - // reply to a post we don't know about, create a record for it anyway 699 - return ix.createMissingPostRecord(ctx, puri) 700 - } 701 - 702 - return &post, nil 703 - } 704 - 705 - func (ix *Indexer) handleRecordCreateFeedPost(ctx context.Context, user models.Uid, rkey string, rcid cid.Cid, rec *bsky.FeedPost) error { 706 - var replyid uint 707 - if rec.Reply != nil { 708 - replyto, err := ix.GetPostOrMissing(ctx, rec.Reply.Parent.Uri) 709 - if err != nil { 710 - return err 711 - } 712 - 713 - replyid = replyto.ID 714 - 715 - rootref, err := ix.GetPostOrMissing(ctx, rec.Reply.Root.Uri) 716 - if err != nil { 717 - return err 718 - } 719 - 720 - // TODO: use this for indexing? 721 - _ = rootref 722 - } 723 - 724 - var mentions []*models.ActorInfo 725 - for _, e := range rec.Entities { 726 - if e.Type == "mention" { 727 - ai, err := ix.GetUserOrMissing(ctx, e.Value) 728 - if err != nil { 729 - return err 730 - } 731 - 732 - mentions = append(mentions, ai) 733 - } 734 - } 735 - 736 - var maybe models.FeedPost 737 - if err := ix.db.Find(&maybe, "rkey = ? AND author = ?", rkey, user).Error; err != nil { 738 - return err 739 - } 740 - 741 - fp := models.FeedPost{ 742 - Rkey: rkey, 743 - Cid: rcid.String(), 744 - Author: user, 745 - ReplyTo: replyid, 746 - } 747 - 748 - if maybe.ID != 0 { 749 - // we're likely filling in a missing reference 750 - if !maybe.Missing { 751 - // TODO: we've already processed this record creation 752 - ix.log.Warn("potentially erroneous event, duplicate create", "rkey", rkey, "user", user) 753 - } 754 - 755 - if err := ix.db.Clauses(clause.OnConflict{ 756 - Columns: []clause.Column{clause.Column{Name: "rkey"}, clause.Column{Name: "author"}}, 757 - UpdateAll: true, 758 - }).Create(&fp).Error; err != nil { 759 - return err 760 - } 761 - 762 - } else { 763 - if err := ix.db.Create(&fp).Error; err != nil { 764 - return err 765 - } 766 - } 767 - 768 - if err := ix.addNewPostNotification(ctx, rec, &fp, mentions); err != nil { 769 - return err 770 - } 771 - 772 - return nil 773 - } 774 - 775 - func (ix *Indexer) createMissingPostRecord(ctx context.Context, puri *util.ParsedUri) (*models.FeedPost, error) { 776 - ix.log.Warn("creating missing post record") 777 - ai, err := ix.GetUserOrMissing(ctx, puri.Did) 778 - if err != nil { 779 - return nil, err 780 - } 781 - 782 - var fp models.FeedPost 783 - if err := ix.db.FirstOrCreate(&fp, models.FeedPost{ 784 - Author: ai.Uid, 785 - Rkey: puri.Rkey, 786 - Missing: true, 787 - }).Error; err != nil { 788 - return nil, err 789 - } 790 - 791 - return &fp, nil 792 - } 793 - 794 - func (ix *Indexer) addNewPostNotification(ctx context.Context, post *bsky.FeedPost, fp *models.FeedPost, mentions []*models.ActorInfo) error { 795 - if post.Reply != nil { 796 - _, err := ix.GetPost(ctx, post.Reply.Parent.Uri) 797 - if err != nil { 798 - ix.log.Error("probably shouldn't error when processing a reply to a not-found post") 799 - return err 800 - } 801 - } 802 - 803 - return nil 804 - } 805 - 806 - func (ix *Indexer) addNewVoteNotification(ctx context.Context, postauthor models.Uid, vr *models.VoteRecord) error { 807 - return nil 808 - }