Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 1311 lines 32 kB view raw
1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "net/http" 13 "net/url" 14 "slices" 15 "sync" 16 17 "time" 18 19 "github.com/avast/retry-go/v4" 20 "github.com/bluesky-social/indigo/atproto/syntax" 21 jmodels "github.com/bluesky-social/jetstream/pkg/models" 22 "github.com/go-git/go-git/v5/plumbing" 23 "github.com/ipfs/go-cid" 24 "golang.org/x/sync/errgroup" 25 "tangled.org/core/api/tangled" 26 "tangled.org/core/appview/config" 27 "tangled.org/core/appview/db" 28 "tangled.org/core/appview/models" 29 "tangled.org/core/appview/serververify" 30 "tangled.org/core/appview/validator" 31 "tangled.org/core/idresolver" 32 "tangled.org/core/orm" 33 "tangled.org/core/rbac" 34) 35 36type Ingester struct { 37 Db db.DbWrapper 38 Enforcer *rbac.Enforcer 39 IdResolver *idresolver.Resolver 40 Config *config.Config 41 Logger *slog.Logger 42 Validator *validator.Validator 43} 44 45type processFunc func(ctx context.Context, e *jmodels.Event) error 46 47func (i *Ingester) Ingest() processFunc { 48 return func(ctx context.Context, e *jmodels.Event) error { 49 var err error 50 51 l := i.Logger.With("kind", e.Kind) 52 switch e.Kind { 53 case jmodels.EventKindAccount: 54 // TODO: sync account state to db 55 if e.Account.Active { 56 break 57 } 58 // TODO: revoke sessions by DID 59 if *e.Account.Status == "deactivated" { 60 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 61 } 62 case jmodels.EventKindIdentity: 63 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 64 case jmodels.EventKindCommit: 65 switch e.Commit.Collection { 66 case tangled.GraphFollowNSID: 67 err = i.ingestFollow(e) 68 case tangled.FeedStarNSID: 69 err = i.ingestStar(e) 70 case tangled.PublicKeyNSID: 71 err = i.ingestPublicKey(e) 72 case tangled.RepoArtifactNSID: 73 err = i.ingestArtifact(e) 74 case tangled.ActorProfileNSID: 75 err = i.ingestProfile(ctx, e) 76 case tangled.SpindleMemberNSID: 77 err = i.ingestSpindleMember(ctx, e) 78 case tangled.SpindleNSID: 79 err = i.ingestSpindle(ctx, e) 80 case tangled.KnotMemberNSID: 81 err = i.ingestKnotMember(e) 82 case tangled.KnotNSID: 83 err = i.ingestKnot(e) 84 case tangled.StringNSID: 85 err = i.ingestString(e) 86 case tangled.RepoIssueNSID: 87 err = i.ingestIssue(ctx, e) 88 case tangled.RepoPullNSID: 89 err = i.ingestPull(ctx, e) 90 case tangled.RepoIssueCommentNSID: 91 err = i.ingestIssueComment(e) 92 case tangled.LabelDefinitionNSID: 93 err = i.ingestLabelDefinition(e) 94 case tangled.LabelOpNSID: 95 err = i.ingestLabelOp(e) 96 } 97 l = i.Logger.With("nsid", e.Commit.Collection) 98 } 99 100 if err != nil { 101 l.Warn("failed to ingest record, skipping", "err", err) 102 } 103 104 lastTimeUs := e.TimeUS + 1 105 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 106 l.Error("failed to save cursor", "err", saveErr) 107 } 108 109 return nil 110 } 111} 112 113func (i *Ingester) ingestStar(e *jmodels.Event) error { 114 var err error 115 did := e.Did 116 117 l := i.Logger.With("handler", "ingestStar") 118 l = l.With("nsid", e.Commit.Collection) 119 120 switch e.Commit.Operation { 121 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 122 var subjectUri syntax.ATURI 123 124 raw := json.RawMessage(e.Commit.Record) 125 record := tangled.FeedStar{} 126 err := json.Unmarshal(raw, &record) 127 if err != nil { 128 l.Error("invalid record", "err", err) 129 return err 130 } 131 132 star := &models.Star{ 133 Did: did, 134 Rkey: e.Commit.RKey, 135 } 136 137 switch { 138 case record.SubjectDid != nil: 139 repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid)) 140 if repoErr == nil { 141 subjectUri = repo.RepoAt() 142 star.RepoAt = subjectUri 143 } 144 case record.Subject != nil: 145 subjectUri, err = syntax.ParseATURI(*record.Subject) 146 if err != nil { 147 l.Error("invalid record", "err", err) 148 return err 149 } 150 star.RepoAt = subjectUri 151 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String()) 152 if repoErr == nil && repo.RepoDid != "" { 153 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil { 154 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 155 } 156 } 157 default: 158 l.Error("star record has neither subject nor subjectDid") 159 return fmt.Errorf("star record has neither subject nor subjectDid") 160 } 161 err = db.AddStar(i.Db, star) 162 case jmodels.CommitOperationDelete: 163 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 164 } 165 166 if err != nil { 167 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 168 } 169 170 return nil 171} 172 173func (i *Ingester) ingestFollow(e *jmodels.Event) error { 174 var err error 175 did := e.Did 176 177 l := i.Logger.With("handler", "ingestFollow") 178 l = l.With("nsid", e.Commit.Collection) 179 180 switch e.Commit.Operation { 181 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 182 raw := json.RawMessage(e.Commit.Record) 183 record := tangled.GraphFollow{} 184 err = json.Unmarshal(raw, &record) 185 if err != nil { 186 l.Error("invalid record", "err", err) 187 return err 188 } 189 190 err = db.AddFollow(i.Db, &models.Follow{ 191 UserDid: did, 192 SubjectDid: record.Subject, 193 Rkey: e.Commit.RKey, 194 }) 195 case jmodels.CommitOperationDelete: 196 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 197 } 198 199 if err != nil { 200 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 201 } 202 203 return nil 204} 205 206func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 207 did := e.Did 208 var err error 209 210 l := i.Logger.With("handler", "ingestPublicKey") 211 l = l.With("nsid", e.Commit.Collection) 212 213 switch e.Commit.Operation { 214 case jmodels.CommitOperationCreate: 215 l.Debug("processing add of pubkey") 216 raw := json.RawMessage(e.Commit.Record) 217 record := tangled.PublicKey{} 218 err = json.Unmarshal(raw, &record) 219 if err != nil { 220 l.Error("invalid record", "err", err) 221 return err 222 } 223 224 name := record.Name 225 key := record.Key 226 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 227 case jmodels.CommitOperationUpdate: 228 l.Debug("processing update of pubkey") 229 raw := json.RawMessage(e.Commit.Record) 230 record := tangled.PublicKey{} 231 err = json.Unmarshal(raw, &record) 232 if err != nil { 233 l.Error("invalid record", "err", err) 234 return err 235 } 236 237 name := record.Name 238 key := record.Key 239 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey) 240 case jmodels.CommitOperationDelete: 241 l.Debug("processing delete of pubkey") 242 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 243 } 244 245 if err != nil { 246 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 247 } 248 249 return nil 250} 251 252func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 253 did := e.Did 254 var err error 255 256 l := i.Logger.With("handler", "ingestArtifact") 257 l = l.With("nsid", e.Commit.Collection) 258 259 switch e.Commit.Operation { 260 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 261 raw := json.RawMessage(e.Commit.Record) 262 record := tangled.RepoArtifact{} 263 err = json.Unmarshal(raw, &record) 264 if err != nil { 265 l.Error("invalid record", "err", err) 266 return err 267 } 268 269 var repo *models.Repo 270 if record.RepoDid != nil && *record.RepoDid != "" { 271 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 272 if err != nil && !errors.Is(err, sql.ErrNoRows) { 273 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 274 } 275 } 276 if repo == nil && record.Repo != nil { 277 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 278 if parseErr != nil { 279 return parseErr 280 } 281 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 282 if err != nil { 283 return err 284 } 285 } 286 if repo == nil { 287 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 288 } 289 290 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 291 if err != nil || !ok { 292 return err 293 } 294 295 repoDid := repo.RepoDid 296 if repoDid == "" && record.RepoDid != nil { 297 repoDid = *record.RepoDid 298 } 299 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 300 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil { 301 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 302 } 303 } 304 305 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 306 if err != nil { 307 createdAt = time.Now() 308 } 309 310 artifact := models.Artifact{ 311 Did: did, 312 Rkey: e.Commit.RKey, 313 RepoAt: repo.RepoAt(), 314 Tag: plumbing.Hash(record.Tag), 315 CreatedAt: createdAt, 316 BlobCid: cid.Cid(record.Artifact.Ref), 317 Name: record.Name, 318 Size: uint64(record.Artifact.Size), 319 MimeType: record.Artifact.MimeType, 320 } 321 322 err = db.AddArtifact(i.Db, artifact) 323 case jmodels.CommitOperationDelete: 324 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 325 } 326 327 if err != nil { 328 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 329 } 330 331 return nil 332} 333 334func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error { 335 did := e.Did 336 var err error 337 338 l := i.Logger.With("handler", "ingestProfile") 339 l = l.With("nsid", e.Commit.Collection) 340 341 if e.Commit.RKey != "self" { 342 return fmt.Errorf("ingestProfile only ingests `self` record") 343 } 344 345 switch e.Commit.Operation { 346 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 347 raw := json.RawMessage(e.Commit.Record) 348 record := tangled.ActorProfile{} 349 err = json.Unmarshal(raw, &record) 350 if err != nil { 351 l.Error("invalid record", "err", err) 352 return err 353 } 354 355 avatar := "" 356 if record.Avatar != nil { 357 avatar = record.Avatar.Ref.String() 358 } 359 360 description := "" 361 if record.Description != nil { 362 description = *record.Description 363 } 364 365 includeBluesky := record.Bluesky 366 367 pronouns := "" 368 if record.Pronouns != nil { 369 pronouns = *record.Pronouns 370 } 371 372 location := "" 373 if record.Location != nil { 374 location = *record.Location 375 } 376 377 var links [5]string 378 for i, l := range record.Links { 379 if i < 5 { 380 links[i] = l 381 } 382 } 383 384 var stats [2]models.VanityStat 385 for i, s := range record.Stats { 386 if i < 2 { 387 stats[i].Kind = models.ParseVanityStatKind(s) 388 } 389 } 390 391 var pinned [6]string 392 for i, r := range record.PinnedRepositories { 393 if i < 6 { 394 pinned[i] = r 395 } 396 } 397 398 var preferredHandle syntax.Handle 399 if record.PreferredHandle != nil { 400 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 401 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 402 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 403 preferredHandle = h 404 } 405 } 406 } 407 408 profile := models.Profile{ 409 Did: did, 410 Avatar: avatar, 411 Description: description, 412 IncludeBluesky: includeBluesky, 413 Location: location, 414 Links: links, 415 Stats: stats, 416 PinnedRepos: pinned, 417 Pronouns: pronouns, 418 PreferredHandle: preferredHandle, 419 } 420 421 ddb, ok := i.Db.Execer.(*db.DB) 422 if !ok { 423 return fmt.Errorf("failed to index profile record, invalid db cast") 424 } 425 426 tx, err := ddb.Begin() 427 if err != nil { 428 return fmt.Errorf("failed to start transaction") 429 } 430 431 err = db.ValidateProfile(tx, &profile) 432 if err != nil { 433 return fmt.Errorf("invalid profile record") 434 } 435 436 err = db.UpsertProfile(tx, &profile) 437 case jmodels.CommitOperationDelete: 438 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 439 } 440 441 if err != nil { 442 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 443 } 444 445 return nil 446} 447 448func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 449 did := e.Did 450 var err error 451 452 l := i.Logger.With("handler", "ingestSpindleMember") 453 l = l.With("nsid", e.Commit.Collection) 454 455 switch e.Commit.Operation { 456 case jmodels.CommitOperationCreate: 457 raw := json.RawMessage(e.Commit.Record) 458 record := tangled.SpindleMember{} 459 err = json.Unmarshal(raw, &record) 460 if err != nil { 461 l.Error("invalid record", "err", err) 462 return err 463 } 464 465 // only spindle owner can invite to spindles 466 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 467 if err != nil || !ok { 468 return fmt.Errorf("failed to enforce permissions: %w", err) 469 } 470 471 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 472 if err != nil { 473 return err 474 } 475 476 if memberId.Handle.IsInvalidHandle() { 477 return err 478 } 479 480 ddb, ok := i.Db.Execer.(*db.DB) 481 if !ok { 482 return fmt.Errorf("invalid db cast") 483 } 484 485 err = db.AddSpindleMember(ddb, models.SpindleMember{ 486 Did: syntax.DID(did), 487 Rkey: e.Commit.RKey, 488 Instance: record.Instance, 489 Subject: memberId.DID, 490 }) 491 if !ok { 492 return fmt.Errorf("failed to add to db: %w", err) 493 } 494 495 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 496 if err != nil { 497 return fmt.Errorf("failed to update ACLs: %w", err) 498 } 499 500 l.Info("added spindle member") 501 case jmodels.CommitOperationDelete: 502 rkey := e.Commit.RKey 503 504 ddb, ok := i.Db.Execer.(*db.DB) 505 if !ok { 506 return fmt.Errorf("failed to index profile record, invalid db cast") 507 } 508 509 // get record from db first 510 members, err := db.GetSpindleMembers( 511 ddb, 512 orm.FilterEq("did", did), 513 orm.FilterEq("rkey", rkey), 514 ) 515 if err != nil || len(members) != 1 { 516 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 517 } 518 member := members[0] 519 520 tx, err := ddb.Begin() 521 if err != nil { 522 return fmt.Errorf("failed to start txn: %w", err) 523 } 524 525 // remove record by rkey && update enforcer 526 if err = db.RemoveSpindleMember( 527 tx, 528 orm.FilterEq("did", did), 529 orm.FilterEq("rkey", rkey), 530 ); err != nil { 531 return fmt.Errorf("failed to remove from db: %w", err) 532 } 533 534 // update enforcer 535 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 536 if err != nil { 537 return fmt.Errorf("failed to update ACLs: %w", err) 538 } 539 540 if err = tx.Commit(); err != nil { 541 return fmt.Errorf("failed to commit txn: %w", err) 542 } 543 544 if err = i.Enforcer.E.SavePolicy(); err != nil { 545 return fmt.Errorf("failed to save ACLs: %w", err) 546 } 547 548 l.Info("removed spindle member") 549 } 550 551 return nil 552} 553 554func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 555 did := e.Did 556 var err error 557 558 l := i.Logger.With("handler", "ingestSpindle") 559 l = l.With("nsid", e.Commit.Collection) 560 561 switch e.Commit.Operation { 562 case jmodels.CommitOperationCreate: 563 raw := json.RawMessage(e.Commit.Record) 564 record := tangled.Spindle{} 565 err = json.Unmarshal(raw, &record) 566 if err != nil { 567 l.Error("invalid record", "err", err) 568 return err 569 } 570 571 instance := e.Commit.RKey 572 573 ddb, ok := i.Db.Execer.(*db.DB) 574 if !ok { 575 return fmt.Errorf("failed to index profile record, invalid db cast") 576 } 577 578 err := db.AddSpindle(ddb, models.Spindle{ 579 Owner: syntax.DID(did), 580 Instance: instance, 581 }) 582 if err != nil { 583 l.Error("failed to add spindle to db", "err", err, "instance", instance) 584 return err 585 } 586 587 err = retry.Do( 588 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 589 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 590 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 591 ) 592 if err != nil { 593 l.Error("failed to verify spindle after retries", "err", err, "instance", instance) 594 return err 595 } 596 597 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 598 if err != nil { 599 return fmt.Errorf("failed to mark verified: %w", err) 600 } 601 602 return nil 603 604 case jmodels.CommitOperationDelete: 605 instance := e.Commit.RKey 606 607 ddb, ok := i.Db.Execer.(*db.DB) 608 if !ok { 609 return fmt.Errorf("failed to index profile record, invalid db cast") 610 } 611 612 // get record from db first 613 spindles, err := db.GetSpindles( 614 ctx, 615 ddb, 616 orm.FilterEq("owner", did), 617 orm.FilterEq("instance", instance), 618 ) 619 if err != nil || len(spindles) != 1 { 620 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 621 } 622 spindle := spindles[0] 623 624 tx, err := ddb.Begin() 625 if err != nil { 626 return err 627 } 628 defer func() { 629 tx.Rollback() 630 i.Enforcer.E.LoadPolicy() 631 }() 632 633 // remove spindle members first 634 err = db.RemoveSpindleMember( 635 tx, 636 orm.FilterEq("owner", did), 637 orm.FilterEq("instance", instance), 638 ) 639 if err != nil { 640 return err 641 } 642 643 err = db.DeleteSpindle( 644 tx, 645 orm.FilterEq("owner", did), 646 orm.FilterEq("instance", instance), 647 ) 648 if err != nil { 649 return err 650 } 651 652 if spindle.Verified != nil { 653 err = i.Enforcer.RemoveSpindle(instance) 654 if err != nil { 655 return err 656 } 657 } 658 659 err = tx.Commit() 660 if err != nil { 661 return err 662 } 663 664 err = i.Enforcer.E.SavePolicy() 665 if err != nil { 666 return err 667 } 668 } 669 670 return nil 671} 672 673func (i *Ingester) ingestString(e *jmodels.Event) error { 674 did := e.Did 675 rkey := e.Commit.RKey 676 677 var err error 678 679 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 680 l.Info("ingesting record") 681 682 ddb, ok := i.Db.Execer.(*db.DB) 683 if !ok { 684 return fmt.Errorf("failed to index string record, invalid db cast") 685 } 686 687 switch e.Commit.Operation { 688 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 689 raw := json.RawMessage(e.Commit.Record) 690 record := tangled.String{} 691 err = json.Unmarshal(raw, &record) 692 if err != nil { 693 l.Error("invalid record", "err", err) 694 return err 695 } 696 697 string := models.StringFromRecord(did, rkey, record) 698 699 if err = i.Validator.ValidateString(&string); err != nil { 700 l.Error("invalid record", "err", err) 701 return err 702 } 703 704 if err = db.AddString(ddb, string); err != nil { 705 l.Error("failed to add string", "err", err) 706 return err 707 } 708 709 return nil 710 711 case jmodels.CommitOperationDelete: 712 if err := db.DeleteString( 713 ddb, 714 orm.FilterEq("did", did), 715 orm.FilterEq("rkey", rkey), 716 ); err != nil { 717 l.Error("failed to delete", "err", err) 718 return fmt.Errorf("failed to delete string record: %w", err) 719 } 720 721 return nil 722 } 723 724 return nil 725} 726 727func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 728 did := e.Did 729 var err error 730 731 l := i.Logger.With("handler", "ingestKnotMember") 732 l = l.With("nsid", e.Commit.Collection) 733 734 switch e.Commit.Operation { 735 case jmodels.CommitOperationCreate: 736 raw := json.RawMessage(e.Commit.Record) 737 record := tangled.KnotMember{} 738 err = json.Unmarshal(raw, &record) 739 if err != nil { 740 l.Error("invalid record", "err", err) 741 return err 742 } 743 744 // only knot owner can invite to knots 745 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 746 if err != nil || !ok { 747 return fmt.Errorf("failed to enforce permissions: %w", err) 748 } 749 750 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 751 if err != nil { 752 return err 753 } 754 755 if memberId.Handle.IsInvalidHandle() { 756 return err 757 } 758 759 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 760 if err != nil { 761 return fmt.Errorf("failed to update ACLs: %w", err) 762 } 763 764 l.Info("added knot member") 765 case jmodels.CommitOperationDelete: 766 // we don't store knot members in a table (like we do for spindle) 767 // and we can't remove this just yet. possibly fixed if we switch 768 // to either: 769 // 1. a knot_members table like with spindle and store the rkey 770 // 2. use the knot host as the rkey 771 // 772 // TODO: implement member deletion 773 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 774 } 775 776 return nil 777} 778 779func (i *Ingester) ingestKnot(e *jmodels.Event) error { 780 did := e.Did 781 var err error 782 783 l := i.Logger.With("handler", "ingestKnot") 784 l = l.With("nsid", e.Commit.Collection) 785 786 switch e.Commit.Operation { 787 case jmodels.CommitOperationCreate: 788 raw := json.RawMessage(e.Commit.Record) 789 record := tangled.Knot{} 790 err = json.Unmarshal(raw, &record) 791 if err != nil { 792 l.Error("invalid record", "err", err) 793 return err 794 } 795 796 domain := e.Commit.RKey 797 798 ddb, ok := i.Db.Execer.(*db.DB) 799 if !ok { 800 return fmt.Errorf("failed to index profile record, invalid db cast") 801 } 802 803 err := db.AddKnot(ddb, domain, did) 804 if err != nil { 805 l.Error("failed to add knot to db", "err", err, "domain", domain) 806 return err 807 } 808 809 err = retry.Do( 810 func() error { 811 return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 812 }, 813 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 814 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 815 ) 816 if err != nil { 817 l.Error("failed to verify knot after retries", "err", err, "domain", domain) 818 return err 819 } 820 821 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 822 if err != nil { 823 return fmt.Errorf("failed to mark verified: %w", err) 824 } 825 826 return nil 827 828 case jmodels.CommitOperationDelete: 829 domain := e.Commit.RKey 830 831 ddb, ok := i.Db.Execer.(*db.DB) 832 if !ok { 833 return fmt.Errorf("failed to index knot record, invalid db cast") 834 } 835 836 // get record from db first 837 registrations, err := db.GetRegistrations( 838 ddb, 839 orm.FilterEq("domain", domain), 840 orm.FilterEq("did", did), 841 ) 842 if err != nil { 843 return fmt.Errorf("failed to get registration: %w", err) 844 } 845 if len(registrations) != 1 { 846 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations)) 847 } 848 registration := registrations[0] 849 850 tx, err := ddb.Begin() 851 if err != nil { 852 return err 853 } 854 defer func() { 855 tx.Rollback() 856 i.Enforcer.E.LoadPolicy() 857 }() 858 859 err = db.DeleteKnot( 860 tx, 861 orm.FilterEq("did", did), 862 orm.FilterEq("domain", domain), 863 ) 864 if err != nil { 865 return err 866 } 867 868 if registration.Registered != nil { 869 err = i.Enforcer.RemoveKnot(domain) 870 if err != nil { 871 return err 872 } 873 } 874 875 err = tx.Commit() 876 if err != nil { 877 return err 878 } 879 880 err = i.Enforcer.E.SavePolicy() 881 if err != nil { 882 return err 883 } 884 } 885 886 return nil 887} 888func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 889 did := e.Did 890 rkey := e.Commit.RKey 891 892 var err error 893 894 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 895 l.Info("ingesting record") 896 897 ddb, ok := i.Db.Execer.(*db.DB) 898 if !ok { 899 return fmt.Errorf("failed to index issue record, invalid db cast") 900 } 901 902 switch e.Commit.Operation { 903 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 904 raw := json.RawMessage(e.Commit.Record) 905 record := tangled.RepoIssue{} 906 err = json.Unmarshal(raw, &record) 907 if err != nil { 908 l.Error("invalid record", "err", err) 909 return err 910 } 911 912 issue := models.IssueFromRecord(did, rkey, record) 913 914 if issue.RepoAt == "" { 915 return fmt.Errorf("issue record has no repo field") 916 } 917 918 if err := i.Validator.ValidateIssue(&issue); err != nil { 919 return fmt.Errorf("failed to validate issue: %w", err) 920 } 921 922 if record.Repo != nil { 923 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo) 924 if repoErr == nil && repo.RepoDid != "" { 925 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil { 926 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 927 } 928 } 929 } 930 931 tx, err := ddb.BeginTx(ctx, nil) 932 if err != nil { 933 l.Error("failed to begin transaction", "err", err) 934 return err 935 } 936 defer tx.Rollback() 937 938 err = db.PutIssue(tx, &issue) 939 if err != nil { 940 l.Error("failed to create issue", "err", err) 941 return err 942 } 943 944 err = tx.Commit() 945 if err != nil { 946 l.Error("failed to commit txn", "err", err) 947 return err 948 } 949 950 return nil 951 952 case jmodels.CommitOperationDelete: 953 tx, err := ddb.BeginTx(ctx, nil) 954 if err != nil { 955 l.Error("failed to begin transaction", "err", err) 956 return err 957 } 958 defer tx.Rollback() 959 960 if err := db.DeleteIssues( 961 tx, 962 did, 963 rkey, 964 ); err != nil { 965 l.Error("failed to delete", "err", err) 966 return fmt.Errorf("failed to delete issue record: %w", err) 967 } 968 if err := tx.Commit(); err != nil { 969 l.Error("failed to commit txn", "err", err) 970 return err 971 } 972 973 return nil 974 } 975 976 return nil 977} 978 979func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error { 980 did := e.Did 981 rkey := e.Commit.RKey 982 983 var err error 984 985 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 986 l.Info("ingesting record") 987 988 ddb, ok := i.Db.Execer.(*db.DB) 989 if !ok { 990 return fmt.Errorf("failed to index pull record, invalid db cast") 991 } 992 993 switch e.Commit.Operation { 994 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 995 raw := json.RawMessage(e.Commit.Record) 996 record := tangled.RepoPull{} 997 err = json.Unmarshal(raw, &record) 998 if err != nil { 999 l.Error("invalid record", "err", err) 1000 return err 1001 } 1002 1003 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 1004 if err != nil { 1005 l.Error("failed to resolve did") 1006 return err 1007 } 1008 1009 // go through and fetch all blobs in parallel 1010 readers := make([]*io.ReadCloser, len(record.Rounds)) 1011 var mu sync.Mutex 1012 1013 g, gctx := errgroup.WithContext(ctx) 1014 1015 for idx, b := range record.Rounds { 1016 g.Go(func() error { 1017 // for some reason, a blob is empty 1018 if b.PatchBlob == nil { 1019 return fmt.Errorf("missing patchBlob in round %d", idx) 1020 } 1021 1022 ownerPds := ownerId.PDSEndpoint() 1023 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1024 q := url.Query() 1025 q.Set("cid", b.PatchBlob.Ref.String()) 1026 q.Set("did", did) 1027 url.RawQuery = q.Encode() 1028 1029 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1030 if err != nil { 1031 l.Error("failed to create request") 1032 return err 1033 } 1034 req.Header.Set("Content-Type", "application/json") 1035 1036 resp, err := http.DefaultClient.Do(req) 1037 if err != nil { 1038 l.Error("failed to make request") 1039 return err 1040 } 1041 1042 mu.Lock() 1043 readers[idx] = &resp.Body 1044 mu.Unlock() 1045 1046 return nil 1047 }) 1048 } 1049 1050 if err := g.Wait(); err != nil { 1051 for _, r := range readers { 1052 if r != nil && *r != nil { 1053 (*r).Close() 1054 } 1055 } 1056 return err 1057 } 1058 1059 defer func() { 1060 for _, r := range readers { 1061 if r != nil && *r != nil { 1062 (*r).Close() 1063 } 1064 } 1065 }() 1066 1067 pull, err := models.PullFromRecord(did, rkey, record, readers) 1068 if err != nil { 1069 return fmt.Errorf("failed to parse pull from record: %w", err) 1070 } 1071 if err := i.Validator.ValidatePull(pull); err != nil { 1072 return fmt.Errorf("failed to validate pull: %w", err) 1073 } 1074 1075 tx, err := ddb.BeginTx(ctx, nil) 1076 if err != nil { 1077 l.Error("failed to begin transaction", "err", err) 1078 return err 1079 } 1080 defer tx.Rollback() 1081 1082 err = db.PutPull(tx, pull) 1083 if err != nil { 1084 l.Error("failed to create pull", "err", err) 1085 return err 1086 } 1087 1088 err = tx.Commit() 1089 if err != nil { 1090 l.Error("failed to commit txn", "err", err) 1091 return err 1092 } 1093 1094 return nil 1095 1096 case jmodels.CommitOperationDelete: 1097 tx, err := ddb.BeginTx(ctx, nil) 1098 if err != nil { 1099 l.Error("failed to begin transaction", "err", err) 1100 return err 1101 } 1102 defer tx.Rollback() 1103 1104 if err := db.AbandonPulls( 1105 tx, 1106 orm.FilterEq("owner_did", did), 1107 orm.FilterEq("rkey", rkey), 1108 ); err != nil { 1109 l.Error("failed to abandon", "err", err) 1110 return fmt.Errorf("failed to abandon pull record: %w", err) 1111 } 1112 if err := tx.Commit(); err != nil { 1113 l.Error("failed to commit txn", "err", err) 1114 return err 1115 } 1116 1117 return nil 1118 } 1119 1120 return nil 1121} 1122 1123func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 1124 did := e.Did 1125 rkey := e.Commit.RKey 1126 1127 var err error 1128 1129 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1130 l.Info("ingesting record") 1131 1132 ddb, ok := i.Db.Execer.(*db.DB) 1133 if !ok { 1134 return fmt.Errorf("failed to index issue comment record, invalid db cast") 1135 } 1136 1137 switch e.Commit.Operation { 1138 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1139 raw := json.RawMessage(e.Commit.Record) 1140 record := tangled.RepoIssueComment{} 1141 err = json.Unmarshal(raw, &record) 1142 if err != nil { 1143 return fmt.Errorf("invalid record: %w", err) 1144 } 1145 1146 comment, err := models.IssueCommentFromRecord(did, rkey, record) 1147 if err != nil { 1148 return fmt.Errorf("failed to parse comment from record: %w", err) 1149 } 1150 1151 if err := i.Validator.ValidateIssueComment(comment); err != nil { 1152 return fmt.Errorf("failed to validate comment: %w", err) 1153 } 1154 1155 tx, err := ddb.Begin() 1156 if err != nil { 1157 return fmt.Errorf("failed to start transaction: %w", err) 1158 } 1159 defer tx.Rollback() 1160 1161 _, err = db.AddIssueComment(tx, *comment) 1162 if err != nil { 1163 return fmt.Errorf("failed to create issue comment: %w", err) 1164 } 1165 1166 return tx.Commit() 1167 1168 case jmodels.CommitOperationDelete: 1169 if err := db.DeleteIssueComments( 1170 ddb, 1171 orm.FilterEq("did", did), 1172 orm.FilterEq("rkey", rkey), 1173 ); err != nil { 1174 return fmt.Errorf("failed to delete issue comment record: %w", err) 1175 } 1176 1177 return nil 1178 } 1179 1180 return nil 1181} 1182 1183func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 1184 did := e.Did 1185 rkey := e.Commit.RKey 1186 1187 var err error 1188 1189 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1190 l.Info("ingesting record") 1191 1192 ddb, ok := i.Db.Execer.(*db.DB) 1193 if !ok { 1194 return fmt.Errorf("failed to index label definition, invalid db cast") 1195 } 1196 1197 switch e.Commit.Operation { 1198 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1199 raw := json.RawMessage(e.Commit.Record) 1200 record := tangled.LabelDefinition{} 1201 err = json.Unmarshal(raw, &record) 1202 if err != nil { 1203 return fmt.Errorf("invalid record: %w", err) 1204 } 1205 1206 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1207 if err != nil { 1208 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1209 } 1210 1211 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1212 return fmt.Errorf("failed to validate labeldef: %w", err) 1213 } 1214 1215 _, err = db.AddLabelDefinition(ddb, def) 1216 if err != nil { 1217 return fmt.Errorf("failed to create labeldef: %w", err) 1218 } 1219 1220 return nil 1221 1222 case jmodels.CommitOperationDelete: 1223 if err := db.DeleteLabelDefinition( 1224 ddb, 1225 orm.FilterEq("did", did), 1226 orm.FilterEq("rkey", rkey), 1227 ); err != nil { 1228 return fmt.Errorf("failed to delete labeldef record: %w", err) 1229 } 1230 1231 return nil 1232 } 1233 1234 return nil 1235} 1236 1237func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1238 did := e.Did 1239 rkey := e.Commit.RKey 1240 1241 var err error 1242 1243 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1244 l.Info("ingesting record") 1245 1246 ddb, ok := i.Db.Execer.(*db.DB) 1247 if !ok { 1248 return fmt.Errorf("failed to index label op, invalid db cast") 1249 } 1250 1251 switch e.Commit.Operation { 1252 case jmodels.CommitOperationCreate: 1253 raw := json.RawMessage(e.Commit.Record) 1254 record := tangled.LabelOp{} 1255 err = json.Unmarshal(raw, &record) 1256 if err != nil { 1257 return fmt.Errorf("invalid record: %w", err) 1258 } 1259 1260 subject := syntax.ATURI(record.Subject) 1261 collection := subject.Collection() 1262 1263 var repo *models.Repo 1264 switch collection { 1265 case tangled.RepoIssueNSID: 1266 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1267 if err != nil || len(i) != 1 { 1268 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1269 } 1270 repo = i[0].Repo 1271 default: 1272 return fmt.Errorf("unsupported label subject: %s", collection) 1273 } 1274 1275 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1276 if err != nil { 1277 return fmt.Errorf("failed to build label application ctx: %w", err) 1278 } 1279 1280 ops := models.LabelOpsFromRecord(did, rkey, record) 1281 1282 for _, o := range ops { 1283 def, ok := actx.Defs[o.OperandKey] 1284 if !ok { 1285 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1286 } 1287 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1288 return fmt.Errorf("failed to validate labelop: %w", err) 1289 } 1290 } 1291 1292 tx, err := ddb.Begin() 1293 if err != nil { 1294 return err 1295 } 1296 defer tx.Rollback() 1297 1298 for _, o := range ops { 1299 _, err = db.AddLabelOp(tx, &o) 1300 if err != nil { 1301 return fmt.Errorf("failed to add labelop: %w", err) 1302 } 1303 } 1304 1305 if err = tx.Commit(); err != nil { 1306 return err 1307 } 1308 } 1309 1310 return nil 1311}