this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

remove a bunch of indexer functionality (#971)

This is on top of https://github.com/bluesky-social/indigo/pull/970

authored by

bnewbold and committed by
GitHub
3ceb27fe ad5bcded

+22 -594
+1 -6
cmd/bigsky/main.go
··· 98 98 Name: "crawl-insecure-ws", 99 99 Usage: "when connecting to PDS instances, use ws:// instead of wss://", 100 100 }, 101 - &cli.BoolFlag{ 102 - Name: "spidering", 103 - Value: false, 104 - EnvVars: []string{"RELAY_SPIDERING", "BGS_SPIDERING"}, 105 - }, 106 101 &cli.StringFlag{ 107 102 Name: "api-listen", 108 103 Value: ":2470", ··· 450 445 451 446 rf := indexer.NewRepoFetcher(db, repoman, cctx.Int("max-fetch-concurrency")) 452 447 453 - ix, err := indexer.NewIndexer(db, evtman, cachedidr, rf, true, false, cctx.Bool("spidering")) 448 + ix, err := indexer.NewIndexer(db, evtman, cachedidr, rf, true) 454 449 if err != nil { 455 450 return err 456 451 }
+4 -577
indexer/indexer.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "database/sql" 6 5 "errors" 7 6 "fmt" 8 7 "log/slog" 9 8 "time" 10 9 11 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 - bsky "github.com/bluesky-social/indigo/api/bsky" 13 11 "github.com/bluesky-social/indigo/did" 14 12 "github.com/bluesky-social/indigo/events" 15 13 lexutil "github.com/bluesky-social/indigo/lex/util" ··· 18 16 "github.com/bluesky-social/indigo/util" 19 17 "github.com/bluesky-social/indigo/xrpc" 20 18 21 - "github.com/ipfs/go-cid" 22 19 "go.opentelemetry.io/otel" 23 20 "gorm.io/gorm" 24 - "gorm.io/gorm/clause" 25 21 ) 26 22 27 23 const MaxEventSliceLength = 1000000 ··· 35 31 36 32 Crawler *CrawlDispatcher 37 33 38 - doAggregations bool 39 - doSpider bool 40 - 41 34 SendRemoteFollow func(context.Context, string, uint) error 42 35 CreateExternalUser func(context.Context, string) (*models.ActorInfo, error) 43 36 ApplyPDSClientSettings func(*xrpc.Client) ··· 45 38 log *slog.Logger 46 39 } 47 40 48 - func NewIndexer(db *gorm.DB, evtman *events.EventManager, didr did.Resolver, fetcher *RepoFetcher, crawl, aggregate, spider bool) (*Indexer, error) { 41 + func NewIndexer(db *gorm.DB, evtman *events.EventManager, didr did.Resolver, fetcher *RepoFetcher, crawl bool) (*Indexer, error) { 49 42 db.AutoMigrate(&models.FeedPost{}) 50 43 db.AutoMigrate(&models.ActorInfo{}) 51 44 db.AutoMigrate(&models.FollowRecord{}) ··· 53 46 db.AutoMigrate(&models.RepostRecord{}) 54 47 55 48 ix := &Indexer{ 56 - db: db, 57 - events: evtman, 58 - didr: didr, 59 - doAggregations: aggregate, 60 - doSpider: spider, 49 + db: db, 50 + events: evtman, 51 + didr: didr, 61 52 SendRemoteFollow: func(context.Context, string, uint) error { 62 53 return nil 63 54 }, ··· 98 89 Action: string(op.Kind), 99 90 Cid: link, 100 91 }) 101 - 102 - if err := ix.handleRepoOp(ctx, evt, &op); err != nil { 103 - ix.log.Error("failed to handle repo op", "err", err) 104 - } 105 92 } 106 93 107 94 did, err := ix.DidForUser(ctx, evt.User) ··· 137 124 return nil 138 125 } 139 126 140 - func (ix *Indexer) handleRepoOp(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error { 141 - switch op.Kind { 142 - case repomgr.EvtKindCreateRecord: 143 - if ix.doAggregations { 144 - _, err := ix.handleRecordCreate(ctx, evt, op, true) 145 - if err != nil { 146 - return fmt.Errorf("handle recordCreate: %w", err) 147 - } 148 - } 149 - if ix.doSpider { 150 - if err := ix.crawlRecordReferences(ctx, op); err != nil { 151 - return err 152 - } 153 - } 154 - case repomgr.EvtKindDeleteRecord: 155 - if ix.doAggregations { 156 - if err := ix.handleRecordDelete(ctx, evt, op, true); err != nil { 157 - return fmt.Errorf("handle recordDelete: %w", err) 158 - } 159 - } 160 - case repomgr.EvtKindUpdateRecord: 161 - if ix.doAggregations { 162 - if err := ix.handleRecordUpdate(ctx, evt, op, true); err != nil { 163 - return fmt.Errorf("handle recordCreate: %w", err) 164 - } 165 - } 166 - default: 167 - return fmt.Errorf("unrecognized repo event type: %q", op.Kind) 168 - } 169 - 170 - return nil 171 - } 172 - 173 - func (ix *Indexer) crawlAtUriRef(ctx context.Context, uri string) error { 174 - puri, err := util.ParseAtUri(uri) 175 - if err != nil { 176 - return err 177 - } 178 - 179 - referencesCrawled.Inc() 180 - 181 - _, err = ix.GetUserOrMissing(ctx, puri.Did) 182 - if err != nil { 183 - return err 184 - } 185 - return nil 186 - } 187 - func (ix *Indexer) crawlRecordReferences(ctx context.Context, op *repomgr.RepoOp) error { 188 - ctx, span := otel.Tracer("indexer").Start(ctx, "crawlRecordReferences") 189 - defer span.End() 190 - 191 - switch rec := op.Record.(type) { 192 - case *bsky.FeedPost: 193 - for _, e := range rec.Entities { 194 - if e.Type == "mention" { 195 - _, err := ix.GetUserOrMissing(ctx, e.Value) 196 - if err != nil { 197 - ix.log.Info("failed to parse user mention", "ref", e.Value, "err", err) 198 - } 199 - } 200 - } 201 - 202 - if rec.Reply != nil { 203 - if rec.Reply.Parent != nil { 204 - if err := ix.crawlAtUriRef(ctx, rec.Reply.Parent.Uri); err != nil { 205 - ix.log.Info("failed to crawl reply parent", "cid", op.RecCid, "replyuri", rec.Reply.Parent.Uri, "err", err) 206 - } 207 - } 208 - 209 - if rec.Reply.Root != nil { 210 - if err := ix.crawlAtUriRef(ctx, rec.Reply.Root.Uri); err != nil { 211 - ix.log.Info("failed to crawl reply root", "cid", op.RecCid, "rooturi", rec.Reply.Root.Uri, "err", err) 212 - } 213 - } 214 - } 215 - 216 - return nil 217 - case *bsky.FeedRepost: 218 - if rec.Subject != nil { 219 - if err := ix.crawlAtUriRef(ctx, rec.Subject.Uri); err != nil { 220 - ix.log.Info("failed to crawl repost subject", "cid", op.RecCid, "subjecturi", rec.Subject.Uri, "err", err) 221 - } 222 - } 223 - return nil 224 - case *bsky.FeedLike: 225 - if rec.Subject != nil { 226 - if err := ix.crawlAtUriRef(ctx, rec.Subject.Uri); err != nil { 227 - ix.log.Info("failed to crawl like subject", "cid", op.RecCid, "subjecturi", rec.Subject.Uri, "err", err) 228 - } 229 - } 230 - return nil 231 - case *bsky.GraphFollow: 232 - _, err := ix.GetUserOrMissing(ctx, rec.Subject) 233 - if err != nil { 234 - ix.log.Info("failed to crawl follow subject", "cid", op.RecCid, "subjectdid", rec.Subject, "err", err) 235 - } 236 - return nil 237 - case *bsky.GraphBlock: 238 - _, err := ix.GetUserOrMissing(ctx, rec.Subject) 239 - if err != nil { 240 - ix.log.Info("failed to crawl follow subject", "cid", op.RecCid, "subjectdid", rec.Subject, "err", err) 241 - } 242 - return nil 243 - case *bsky.ActorProfile: 244 - return nil 245 - case *bsky.GraphList: 246 - return nil 247 - case *bsky.GraphListitem: 248 - return nil 249 - case *bsky.FeedGenerator: 250 - return nil 251 - default: 252 - ix.log.Warn("unrecognized record type (crawling references)", "record", op.Record, "collection", op.Collection) 253 - return nil 254 - } 255 - } 256 - 257 127 func (ix *Indexer) GetUserOrMissing(ctx context.Context, did string) (*models.ActorInfo, error) { 258 128 ctx, span := otel.Tracer("indexer").Start(ctx, "getUserOrMissing") 259 129 defer span.End() ··· 342 212 return &ai, nil 343 213 } 344 214 345 - func (ix *Indexer) handleInitActor(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error { 346 - ai := op.ActorInfo 347 - 348 - if err := ix.db.Clauses(clause.OnConflict{ 349 - Columns: []clause.Column{{Name: "uid"}}, 350 - UpdateAll: true, 351 - }).Create(&models.ActorInfo{ 352 - Uid: evt.User, 353 - Handle: sql.NullString{String: ai.Handle, Valid: true}, 354 - Did: ai.Did, 355 - DisplayName: ai.DisplayName, 356 - Type: ai.Type, 357 - PDS: evt.PDS, 358 - }).Error; err != nil { 359 - return fmt.Errorf("initializing new actor info: %w", err) 360 - } 361 - 362 - if err := ix.db.Create(&models.FollowRecord{ 363 - Follower: evt.User, 364 - Target: evt.User, 365 - }).Error; err != nil { 366 - return err 367 - } 368 - 369 - return nil 370 - } 371 - 372 215 func isNotFound(err error) bool { 373 216 if errors.Is(err, gorm.ErrRecordNotFound) { 374 217 return true ··· 390 233 391 234 return &post, nil 392 235 } 393 - 394 - func (ix *Indexer) handleRecordDelete(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp, local bool) error { 395 - ix.log.Debug("record delete event", "collection", op.Collection) 396 - 397 - switch op.Collection { 398 - case "app.bsky.feed.post": 399 - u, err := ix.LookupUser(ctx, evt.User) 400 - if err != nil { 401 - return err 402 - } 403 - 404 - uri := "at://" + u.Did + "/app.bsky.feed.post/" + op.Rkey 405 - 406 - // NB: currently not using the 'or missing' variant here. If we delete 407 - // something that we've never seen before, maybe just dont bother? 408 - fp, err := ix.GetPost(ctx, uri) 409 - if err != nil { 410 - if errors.Is(err, gorm.ErrRecordNotFound) { 411 - ix.log.Warn("deleting post weve never seen before. Weird.", "user", evt.User, "rkey", op.Rkey) 412 - return nil 413 - } 414 - return err 415 - } 416 - 417 - if err := ix.db.Model(models.FeedPost{}).Where("id = ?", fp.ID).UpdateColumn("deleted", true).Error; err != nil { 418 - return err 419 - } 420 - case "app.bsky.feed.repost": 421 - if err := ix.db.Where("reposter = ? AND rkey = ?", evt.User, op.Rkey).Delete(&models.RepostRecord{}).Error; err != nil { 422 - return err 423 - } 424 - case "app.bsky.feed.vote": 425 - return ix.handleRecordDeleteFeedLike(ctx, evt, op) 426 - case "app.bsky.graph.follow": 427 - return ix.handleRecordDeleteGraphFollow(ctx, evt, op) 428 - case "app.bsky.graph.confirmation": 429 - return nil 430 - default: 431 - return fmt.Errorf("unrecognized record type (delete): %q", op.Collection) 432 - } 433 - 434 - return nil 435 - } 436 - 437 - func (ix *Indexer) handleRecordDeleteFeedLike(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error { 438 - var vr models.VoteRecord 439 - if err := ix.db.Find(&vr, "voter = ? AND rkey = ?", evt.User, op.Rkey).Error; err != nil { 440 - return err 441 - } 442 - 443 - if err := ix.db.Transaction(func(tx *gorm.DB) error { 444 - tx.Statement.RaiseErrorOnNotFound = true 445 - if err := tx.Model(models.VoteRecord{}).Where("id = ?", vr.ID).Delete(&vr).Error; err != nil { 446 - return err 447 - } 448 - 449 - if err := tx.Model(models.FeedPost{}).Where("id = ?", vr.Post).Update("up_count", gorm.Expr("up_count - 1")).Error; err != nil { 450 - return err 451 - } 452 - 453 - return nil 454 - }); err != nil { 455 - return err 456 - } 457 - 458 - ix.log.Warn("need to delete vote notification") 459 - return nil 460 - } 461 - 462 - func (ix *Indexer) handleRecordDeleteGraphFollow(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error { 463 - q := ix.db.Where("follower = ? AND rkey = ?", evt.User, op.Rkey).Delete(&models.FollowRecord{}) 464 - if err := q.Error; err != nil { 465 - return err 466 - } 467 - 468 - if q.RowsAffected == 0 { 469 - ix.log.Warn("attempted to delete follow we did not have a record for", "user", evt.User, "rkey", op.Rkey) 470 - return nil 471 - } 472 - 473 - return nil 474 - } 475 - 476 - func (ix *Indexer) handleRecordCreate(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp, local bool) ([]uint, error) { 477 - ix.log.Debug("record create event", "collection", op.Collection) 478 - 479 - var out []uint 480 - switch rec := op.Record.(type) { 481 - case *bsky.FeedPost: 482 - if err := ix.handleRecordCreateFeedPost(ctx, evt.User, op.Rkey, *op.RecCid, rec); err != nil { 483 - return nil, err 484 - } 485 - case *bsky.FeedRepost: 486 - fp, err := ix.GetPostOrMissing(ctx, rec.Subject.Uri) 487 - if err != nil { 488 - return nil, err 489 - } 490 - 491 - author, err := ix.LookupUser(ctx, fp.Author) 492 - if err != nil { 493 - return nil, err 494 - } 495 - 496 - out = append(out, author.PDS) 497 - 498 - rr := models.RepostRecord{ 499 - RecCreated: rec.CreatedAt, 500 - Post: fp.ID, 501 - Reposter: evt.User, 502 - Author: fp.Author, 503 - RecCid: op.RecCid.String(), 504 - Rkey: op.Rkey, 505 - } 506 - if err := ix.db.Create(&rr).Error; err != nil { 507 - return nil, err 508 - } 509 - 510 - case *bsky.FeedLike: 511 - return nil, ix.handleRecordCreateFeedLike(ctx, rec, evt, op) 512 - case *bsky.GraphFollow: 513 - return out, ix.handleRecordCreateGraphFollow(ctx, rec, evt, op) 514 - case *bsky.GraphBlock: 515 - return out, nil 516 - case *bsky.GraphList: 517 - return out, nil 518 - case *bsky.GraphListitem: 519 - return out, nil 520 - case *bsky.FeedGenerator: 521 - return out, nil 522 - case *bsky.ActorProfile: 523 - ix.log.Debug("TODO: got actor profile record creation, need to do something with this") 524 - default: 525 - ix.log.Warn("unrecognized record", "record", op.Record, "collection", op.Collection) 526 - return nil, fmt.Errorf("unrecognized record type (creation): %s", op.Collection) 527 - } 528 - 529 - return out, nil 530 - } 531 - 532 - func (ix *Indexer) handleRecordCreateFeedLike(ctx context.Context, rec *bsky.FeedLike, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error { 533 - post, err := ix.GetPostOrMissing(ctx, rec.Subject.Uri) 534 - if err != nil { 535 - return err 536 - } 537 - 538 - act, err := ix.LookupUser(ctx, post.Author) 539 - if err != nil { 540 - return err 541 - } 542 - 543 - vr := models.VoteRecord{ 544 - Voter: evt.User, 545 - Post: post.ID, 546 - Created: rec.CreatedAt, 547 - Rkey: op.Rkey, 548 - Cid: op.RecCid.String(), 549 - } 550 - if err := ix.db.Create(&vr).Error; err != nil { 551 - return err 552 - } 553 - 554 - if err := ix.db.Model(models.FeedPost{}).Where("id = ?", post.ID).Update("up_count", gorm.Expr("up_count + 1")).Error; err != nil { 555 - return err 556 - } 557 - if err := ix.addNewVoteNotification(ctx, act.Uid, &vr); err != nil { 558 - return err 559 - } 560 - 561 - return nil 562 - } 563 - 564 - func (ix *Indexer) handleRecordCreateGraphFollow(ctx context.Context, rec *bsky.GraphFollow, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error { 565 - subj, err := ix.LookupUserByDid(ctx, rec.Subject) 566 - if err != nil { 567 - if !errors.Is(err, gorm.ErrRecordNotFound) { 568 - return fmt.Errorf("failed to lookup user: %w", err) 569 - } 570 - 571 - nu, err := ix.createMissingUserRecord(ctx, rec.Subject) 572 - if err != nil { 573 - return fmt.Errorf("create external user: %w", err) 574 - } 575 - 576 - subj = nu 577 - } 578 - 579 - // 'follower' followed 'target' 580 - fr := models.FollowRecord{ 581 - Follower: evt.User, 582 - Target: subj.Uid, 583 - Rkey: op.Rkey, 584 - Cid: op.RecCid.String(), 585 - } 586 - if err := ix.db.Create(&fr).Error; err != nil { 587 - return err 588 - } 589 - 590 - return nil 591 - } 592 - 593 - func (ix *Indexer) handleRecordUpdate(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp, local bool) error { 594 - ix.log.Debug("record update event", "collection", op.Collection) 595 - 596 - switch rec := op.Record.(type) { 597 - case *bsky.FeedPost: 598 - u, err := ix.LookupUser(ctx, evt.User) 599 - if err != nil { 600 - return err 601 - } 602 - 603 - uri := "at://" + u.Did + "/app.bsky.feed.post/" + op.Rkey 604 - fp, err := ix.GetPostOrMissing(ctx, uri) 605 - if err != nil { 606 - return err 607 - } 608 - 609 - oldReply := fp.ReplyTo != 0 610 - newReply := rec.Reply != nil 611 - 612 - if oldReply != newReply { 613 - // the 'replyness' of the post was changed... that's weird 614 - ix.log.Error("need to properly handle case where reply-ness of posts is changed") 615 - return nil 616 - } 617 - 618 - if newReply { 619 - replyto, err := ix.GetPostOrMissing(ctx, rec.Reply.Parent.Uri) 620 - if err != nil { 621 - return err 622 - } 623 - 624 - if replyto.ID != fp.ReplyTo { 625 - ix.log.Error("post was changed to be a reply to a different post") 626 - return nil 627 - } 628 - } 629 - 630 - if err := ix.db.Model(models.FeedPost{}).Where("id = ?", fp.ID).UpdateColumn("cid", op.RecCid.String()).Error; err != nil { 631 - return err 632 - } 633 - 634 - return nil 635 - case *bsky.FeedRepost: 636 - var rr models.RepostRecord 637 - if err := ix.db.First(&rr, "reposter = ? AND rkey = ?", evt.User, op.Rkey).Error; err != nil { 638 - return err 639 - } 640 - 641 - // TODO: check if the post changed and do something about that 642 - 643 - rr.RecCreated = rec.CreatedAt 644 - rr.RecCid = op.RecCid.String() 645 - 646 - if err := ix.db.Save(&rr).Error; err != nil { 647 - return err 648 - } 649 - 650 - case *bsky.FeedLike: 651 - var vr models.VoteRecord 652 - if err := ix.db.Find(&vr, "voted = ? AND rkey = ?", evt.User, op.Rkey).Error; err != nil { 653 - return err 654 - } 655 - 656 - fp, err := ix.GetPostOrMissing(ctx, rec.Subject.Uri) 657 - if err != nil { 658 - return err 659 - } 660 - 661 - if vr.Post != fp.ID { 662 - // vote is on a completely different post, delete old one, create new one 663 - if err := ix.handleRecordDeleteFeedLike(ctx, evt, op); err != nil { 664 - return err 665 - } 666 - 667 - return ix.handleRecordCreateFeedLike(ctx, rec, evt, op) 668 - } 669 - 670 - return ix.handleRecordCreateFeedLike(ctx, rec, evt, op) 671 - case *bsky.GraphFollow: 672 - if err := ix.handleRecordDeleteGraphFollow(ctx, evt, op); err != nil { 673 - return err 674 - } 675 - 676 - return ix.handleRecordCreateGraphFollow(ctx, rec, evt, op) 677 - case *bsky.ActorProfile: 678 - ix.log.Debug("TODO: got actor profile record update, need to do something with this") 679 - default: 680 - return fmt.Errorf("unrecognized record type (update): %s", op.Collection) 681 - } 682 - 683 - return nil 684 - } 685 - 686 - func (ix *Indexer) GetPostOrMissing(ctx context.Context, uri string) (*models.FeedPost, error) { 687 - puri, err := util.ParseAtUri(uri) 688 - if err != nil { 689 - return nil, err 690 - } 691 - 692 - var post models.FeedPost 693 - if err := ix.db.Find(&post, "rkey = ? AND author = (?)", puri.Rkey, ix.db.Model(models.ActorInfo{}).Where("did = ?", puri.Did).Select("id")).Error; err != nil { 694 - return nil, err 695 - } 696 - 697 - if post.ID == 0 { 698 - // reply to a post we don't know about, create a record for it anyway 699 - return ix.createMissingPostRecord(ctx, puri) 700 - } 701 - 702 - return &post, nil 703 - } 704 - 705 - func (ix *Indexer) handleRecordCreateFeedPost(ctx context.Context, user models.Uid, rkey string, rcid cid.Cid, rec *bsky.FeedPost) error { 706 - var replyid uint 707 - if rec.Reply != nil { 708 - replyto, err := ix.GetPostOrMissing(ctx, rec.Reply.Parent.Uri) 709 - if err != nil { 710 - return err 711 - } 712 - 713 - replyid = replyto.ID 714 - 715 - rootref, err := ix.GetPostOrMissing(ctx, rec.Reply.Root.Uri) 716 - if err != nil { 717 - return err 718 - } 719 - 720 - // TODO: use this for indexing? 721 - _ = rootref 722 - } 723 - 724 - var mentions []*models.ActorInfo 725 - for _, e := range rec.Entities { 726 - if e.Type == "mention" { 727 - ai, err := ix.GetUserOrMissing(ctx, e.Value) 728 - if err != nil { 729 - return err 730 - } 731 - 732 - mentions = append(mentions, ai) 733 - } 734 - } 735 - 736 - var maybe models.FeedPost 737 - if err := ix.db.Find(&maybe, "rkey = ? AND author = ?", rkey, user).Error; err != nil { 738 - return err 739 - } 740 - 741 - fp := models.FeedPost{ 742 - Rkey: rkey, 743 - Cid: rcid.String(), 744 - Author: user, 745 - ReplyTo: replyid, 746 - } 747 - 748 - if maybe.ID != 0 { 749 - // we're likely filling in a missing reference 750 - if !maybe.Missing { 751 - // TODO: we've already processed this record creation 752 - ix.log.Warn("potentially erroneous event, duplicate create", "rkey", rkey, "user", user) 753 - } 754 - 755 - if err := ix.db.Clauses(clause.OnConflict{ 756 - Columns: []clause.Column{clause.Column{Name: "rkey"}, clause.Column{Name: "author"}}, 757 - UpdateAll: true, 758 - }).Create(&fp).Error; err != nil { 759 - return err 760 - } 761 - 762 - } else { 763 - if err := ix.db.Create(&fp).Error; err != nil { 764 - return err 765 - } 766 - } 767 - 768 - if err := ix.addNewPostNotification(ctx, rec, &fp, mentions); err != nil { 769 - return err 770 - } 771 - 772 - return nil 773 - } 774 - 775 - func (ix *Indexer) createMissingPostRecord(ctx context.Context, puri *util.ParsedUri) (*models.FeedPost, error) { 776 - ix.log.Warn("creating missing post record") 777 - ai, err := ix.GetUserOrMissing(ctx, puri.Did) 778 - if err != nil { 779 - return nil, err 780 - } 781 - 782 - var fp models.FeedPost 783 - if err := ix.db.FirstOrCreate(&fp, models.FeedPost{ 784 - Author: ai.Uid, 785 - Rkey: puri.Rkey, 786 - Missing: true, 787 - }).Error; err != nil { 788 - return nil, err 789 - } 790 - 791 - return &fp, nil 792 - } 793 - 794 - func (ix *Indexer) addNewPostNotification(ctx context.Context, post *bsky.FeedPost, fp *models.FeedPost, mentions []*models.ActorInfo) error { 795 - if post.Reply != nil { 796 - _, err := ix.GetPost(ctx, post.Reply.Parent.Uri) 797 - if err != nil { 798 - ix.log.Error("probably shouldn't error when processing a reply to a not-found post") 799 - return err 800 - } 801 - } 802 - 803 - return nil 804 - } 805 - 806 - func (ix *Indexer) addNewVoteNotification(ctx context.Context, postauthor models.Uid, vr *models.VoteRecord) error { 807 - return nil 808 - }
+1 -1
indexer/posts_test.go
··· 61 61 62 62 rf := NewRepoFetcher(maindb, repoman, 10) 63 63 64 - ix, err := NewIndexer(maindb, evtman, didr, rf, false, true, true) 64 + ix, err := NewIndexer(maindb, evtman, didr, rf, false) 65 65 if err != nil { 66 66 t.Fatal(err) 67 67 }
+1 -1
pds/server.go
··· 72 72 73 73 rf := indexer.NewRepoFetcher(db, repoman, 10) 74 74 75 - ix, err := indexer.NewIndexer(db, evtman, didr, rf, false, true, true) 75 + ix, err := indexer.NewIndexer(db, evtman, didr, rf, false) 76 76 if err != nil { 77 77 return nil, err 78 78 }
+11 -5
testing/integ_test.go
··· 185 185 // Now, the relay will discover a gap, and have to catch up somehow 186 186 socialSim(t, users2, 1, 0) 187 187 188 - time.Sleep(time.Second) 189 - 190 188 // we expect the relay to learn about posts that it did not directly see from 191 189 // repos its already partially scraped, as long as its seen *something* after the missing post 192 190 // this is the 'catchup' process 191 + _ = p2posts2 192 + /* NOTE: BGS doesn't support indexing any more 193 + time.Sleep(time.Second) 193 194 ctx := context.Background() 194 195 _, err := b1.bgs.Index.GetPost(ctx, p2posts2[4].Uri) 195 196 if err != nil { 196 197 t.Fatal(err) 197 198 } 199 + */ 198 200 } 199 201 200 202 func TestRelayMultiGap(t *testing.T) { ··· 229 231 p2posts := socialSim(t, users2, 10, 0) 230 232 231 233 users[0].Reply(t, p2posts[0], p2posts[0], "what a wonderful life") 232 - time.Sleep(time.Second * 2) 233 234 235 + /* NOTE: BGS doesn't support indexing any more 236 + time.Sleep(time.Second * 2) 234 237 ctx := context.Background() 235 238 _, err := b1.bgs.Index.GetPost(ctx, p2posts[3].Uri) 236 239 if err != nil { 237 240 t.Fatal(err) 238 241 } 242 + */ 239 243 240 244 // now if we make posts on pds 2, the relay will not hear about those new posts 241 245 ··· 249 253 250 254 // Now, the relay will discover a gap, and have to catch up somehow 251 255 socialSim(t, users2, 1, 0) 252 - 253 - time.Sleep(time.Second * 2) 254 256 255 257 // we expect the relay to learn about posts that it did not directly see from 256 258 // repos its already partially scraped, as long as its seen *something* after the missing post 257 259 // this is the 'catchup' process 260 + _ = p2posts2 261 + /* NOTE: BGS doesn't support indexing any more 262 + time.Sleep(time.Second * 2) 258 263 _, err = b1.bgs.Index.GetPost(ctx, p2posts2[4].Uri) 259 264 if err != nil { 260 265 t.Fatal(err) 261 266 } 267 + */ 262 268 } 263 269 264 270 func TestHandleChange(t *testing.T) {
+4 -4
testing/utils.go
··· 422 422 423 423 ctx := context.TODO() 424 424 _, err := atproto.RepoCreateRecord(ctx, u.client, &atproto.RepoCreateRecord_Input{ 425 - Collection: "app.bsky.feed.vote", 425 + Collection: "app.bsky.feed.like", 426 426 Repo: u.did, 427 427 Record: &lexutil.LexiconTypeDecoder{Val: &bsky.FeedLike{ 428 - LexiconTypeID: "app.bsky.feed.vote", 428 + LexiconTypeID: "app.bsky.feed.like", 429 429 CreatedAt: time.Now().Format(time.RFC3339), 430 430 Subject: post, 431 431 }}, ··· 566 566 evtman := events.NewEventManager(diskpersist) 567 567 rf := indexer.NewRepoFetcher(maindb, repoman, 10) 568 568 569 - ix, err := indexer.NewIndexer(maindb, evtman, didr, rf, true, true, true) 569 + ix, err := indexer.NewIndexer(maindb, evtman, didr, rf, true) 570 570 if err != nil { 571 571 return nil, err 572 572 } ··· 942 942 return cid.Undef, err 943 943 } 944 944 case "like": 945 - _, _, err := r.CreateRecord(ctx, "app.bsky.feed.vote", &bsky.FeedLike{ 945 + _, _, err := r.CreateRecord(ctx, "app.bsky.feed.like", &bsky.FeedLike{ 946 946 CreatedAt: time.Now().Format(bsutil.ISO8601), 947 947 Subject: &atproto.RepoStrongRef{ 948 948 Uri: RandFakeAtUri("app.bsky.feed.post", ""),