Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eti/gitignore 1326 lines 33 kB view raw
1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "net/http" 13 "net/url" 14 "slices" 15 "sync" 16 17 "time" 18 19 "github.com/avast/retry-go/v4" 20 "github.com/bluesky-social/indigo/atproto/syntax" 21 jmodels "github.com/bluesky-social/jetstream/pkg/models" 22 "github.com/go-git/go-git/v5/plumbing" 23 "github.com/ipfs/go-cid" 24 "golang.org/x/sync/errgroup" 25 "tangled.org/core/api/tangled" 26 "tangled.org/core/appview/cache" 27 "tangled.org/core/appview/config" 28 "tangled.org/core/appview/db" 29 "tangled.org/core/appview/models" 30 "tangled.org/core/appview/serververify" 31 "tangled.org/core/appview/validator" 32 "tangled.org/core/idresolver" 33 "tangled.org/core/orm" 34 "tangled.org/core/rbac" 35) 36 37type Ingester struct { 38 Db db.DbWrapper 39 Enforcer *rbac.Enforcer 40 IdResolver *idresolver.Resolver 41 Cache *cache.Cache 42 Config *config.Config 43 Logger *slog.Logger 44 Validator *validator.Validator 45} 46 47type processFunc func(ctx context.Context, e *jmodels.Event) error 48 49func (i *Ingester) Ingest() processFunc { 50 return func(ctx context.Context, e *jmodels.Event) error { 51 var err error 52 53 l := i.Logger.With("kind", e.Kind) 54 switch e.Kind { 55 case jmodels.EventKindAccount: 56 // TODO: sync account state to db 57 if e.Account.Active { 58 break 59 } 60 // TODO: revoke sessions by DID 61 if *e.Account.Status == "deactivated" { 62 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 63 } 64 case jmodels.EventKindIdentity: 65 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 66 case jmodels.EventKindCommit: 67 switch e.Commit.Collection { 68 case tangled.GraphFollowNSID: 69 err = i.ingestFollow(e) 70 case tangled.FeedStarNSID: 71 err = i.ingestStar(e) 72 case tangled.PublicKeyNSID: 73 err = i.ingestPublicKey(e) 74 case tangled.RepoArtifactNSID: 75 err = i.ingestArtifact(e) 76 case tangled.ActorProfileNSID: 77 err = i.ingestProfile(ctx, e) 78 case tangled.SpindleMemberNSID: 79 err = i.ingestSpindleMember(ctx, e) 80 case tangled.SpindleNSID: 81 err = i.ingestSpindle(ctx, e) 82 case tangled.KnotMemberNSID: 83 err = i.ingestKnotMember(e) 84 case tangled.KnotNSID: 85 err = i.ingestKnot(e) 86 case tangled.StringNSID: 87 err = i.ingestString(e) 88 case tangled.RepoIssueNSID: 89 err = i.ingestIssue(ctx, e) 90 case tangled.RepoPullNSID: 91 err = i.ingestPull(ctx, e) 92 case tangled.RepoIssueCommentNSID: 93 err = i.ingestIssueComment(e) 94 case tangled.LabelDefinitionNSID: 95 err = i.ingestLabelDefinition(e) 96 case tangled.LabelOpNSID: 97 err = i.ingestLabelOp(e) 98 } 99 l = i.Logger.With("nsid", e.Commit.Collection) 100 } 101 102 if err != nil { 103 l.Warn("failed to ingest record, skipping", "err", err) 104 } 105 106 lastTimeUs := e.TimeUS + 1 107 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 108 l.Error("failed to save cursor", "err", saveErr) 109 } 110 111 return nil 112 } 113} 114 115func (i *Ingester) ingestStar(e *jmodels.Event) error { 116 var err error 117 did := e.Did 118 119 l := i.Logger.With("handler", "ingestStar") 120 l = l.With("nsid", e.Commit.Collection) 121 122 switch e.Commit.Operation { 123 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 124 var subjectUri syntax.ATURI 125 126 raw := json.RawMessage(e.Commit.Record) 127 record := tangled.FeedStar{} 128 err := json.Unmarshal(raw, &record) 129 if err != nil { 130 l.Error("invalid record", "err", err) 131 return err 132 } 133 134 star := &models.Star{ 135 Did: did, 136 Rkey: e.Commit.RKey, 137 } 138 139 switch { 140 case record.SubjectDid != nil: 141 repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid)) 142 if repoErr == nil { 143 subjectUri = repo.RepoAt() 144 star.RepoAt = subjectUri 145 } 146 case record.Subject != nil: 147 subjectUri, err = syntax.ParseATURI(*record.Subject) 148 if err != nil { 149 l.Error("invalid record", "err", err) 150 return err 151 } 152 star.RepoAt = subjectUri 153 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String()) 154 if repoErr == nil && repo.RepoDid != "" { 155 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil { 156 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 157 } 158 } 159 default: 160 l.Error("star record has neither subject nor subjectDid") 161 return fmt.Errorf("star record has neither subject nor subjectDid") 162 } 163 err = db.AddStar(i.Db, star) 164 case jmodels.CommitOperationDelete: 165 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 166 } 167 168 if err != nil { 169 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 170 } 171 172 return nil 173} 174 175func (i *Ingester) ingestFollow(e *jmodels.Event) error { 176 var err error 177 did := e.Did 178 179 l := i.Logger.With("handler", "ingestFollow") 180 l = l.With("nsid", e.Commit.Collection) 181 182 switch e.Commit.Operation { 183 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 184 raw := json.RawMessage(e.Commit.Record) 185 record := tangled.GraphFollow{} 186 err = json.Unmarshal(raw, &record) 187 if err != nil { 188 l.Error("invalid record", "err", err) 189 return err 190 } 191 192 err = db.AddFollow(i.Db, &models.Follow{ 193 UserDid: did, 194 SubjectDid: record.Subject, 195 Rkey: e.Commit.RKey, 196 }) 197 case jmodels.CommitOperationDelete: 198 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 199 } 200 201 if err != nil { 202 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 203 } 204 205 return nil 206} 207 208func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 209 did := e.Did 210 var err error 211 212 l := i.Logger.With("handler", "ingestPublicKey") 213 l = l.With("nsid", e.Commit.Collection) 214 215 switch e.Commit.Operation { 216 case jmodels.CommitOperationCreate: 217 l.Debug("processing add of pubkey") 218 raw := json.RawMessage(e.Commit.Record) 219 record := tangled.PublicKey{} 220 err = json.Unmarshal(raw, &record) 221 if err != nil { 222 l.Error("invalid record", "err", err) 223 return err 224 } 225 226 name := record.Name 227 key := record.Key 228 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 229 case jmodels.CommitOperationUpdate: 230 l.Debug("processing update of pubkey") 231 raw := json.RawMessage(e.Commit.Record) 232 record := tangled.PublicKey{} 233 err = json.Unmarshal(raw, &record) 234 if err != nil { 235 l.Error("invalid record", "err", err) 236 return err 237 } 238 239 name := record.Name 240 key := record.Key 241 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey) 242 case jmodels.CommitOperationDelete: 243 l.Debug("processing delete of pubkey") 244 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 245 } 246 247 if err != nil { 248 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 249 } 250 251 return nil 252} 253 254func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 255 did := e.Did 256 var err error 257 258 l := i.Logger.With("handler", "ingestArtifact") 259 l = l.With("nsid", e.Commit.Collection) 260 261 switch e.Commit.Operation { 262 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 263 raw := json.RawMessage(e.Commit.Record) 264 record := tangled.RepoArtifact{} 265 err = json.Unmarshal(raw, &record) 266 if err != nil { 267 l.Error("invalid record", "err", err) 268 return err 269 } 270 271 var repo *models.Repo 272 if record.RepoDid != nil && *record.RepoDid != "" { 273 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 274 if err != nil && !errors.Is(err, sql.ErrNoRows) { 275 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 276 } 277 } 278 if repo == nil && record.Repo != nil { 279 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 280 if parseErr != nil { 281 return parseErr 282 } 283 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 284 if err != nil { 285 return err 286 } 287 } 288 if repo == nil { 289 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 290 } 291 292 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 293 if err != nil || !ok { 294 return err 295 } 296 297 repoDid := repo.RepoDid 298 if repoDid == "" && record.RepoDid != nil { 299 repoDid = *record.RepoDid 300 } 301 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 302 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil { 303 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 304 } 305 } 306 307 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 308 if err != nil { 309 createdAt = time.Now() 310 } 311 312 artifact := models.Artifact{ 313 Did: did, 314 Rkey: e.Commit.RKey, 315 RepoAt: repo.RepoAt(), 316 Tag: plumbing.Hash(record.Tag), 317 CreatedAt: createdAt, 318 BlobCid: cid.Cid(record.Artifact.Ref), 319 Name: record.Name, 320 Size: uint64(record.Artifact.Size), 321 MimeType: record.Artifact.MimeType, 322 } 323 324 err = db.AddArtifact(i.Db, artifact) 325 case jmodels.CommitOperationDelete: 326 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 327 } 328 329 if err != nil { 330 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 331 } 332 333 return nil 334} 335 336func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error { 337 did := e.Did 338 var err error 339 340 l := i.Logger.With("handler", "ingestProfile") 341 l = l.With("nsid", e.Commit.Collection) 342 343 if e.Commit.RKey != "self" { 344 return fmt.Errorf("ingestProfile only ingests `self` record") 345 } 346 347 switch e.Commit.Operation { 348 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 349 raw := json.RawMessage(e.Commit.Record) 350 record := tangled.ActorProfile{} 351 err = json.Unmarshal(raw, &record) 352 if err != nil { 353 l.Error("invalid record", "err", err) 354 return err 355 } 356 357 avatar := "" 358 if record.Avatar != nil { 359 avatar = record.Avatar.Ref.String() 360 } 361 362 description := "" 363 if record.Description != nil { 364 description = *record.Description 365 } 366 367 includeBluesky := record.Bluesky 368 369 pronouns := "" 370 if record.Pronouns != nil { 371 pronouns = *record.Pronouns 372 } 373 374 location := "" 375 if record.Location != nil { 376 location = *record.Location 377 } 378 379 var links [5]string 380 for i, l := range record.Links { 381 if i < 5 { 382 links[i] = l 383 } 384 } 385 386 var stats [2]models.VanityStat 387 for i, s := range record.Stats { 388 if i < 2 { 389 stats[i].Kind = models.ParseVanityStatKind(s) 390 } 391 } 392 393 var pinned [6]string 394 for i, r := range record.PinnedRepositories { 395 if i < 6 { 396 pinned[i] = r 397 } 398 } 399 400 var preferredHandle syntax.Handle 401 if record.PreferredHandle != nil { 402 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 403 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 404 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 405 preferredHandle = h 406 } 407 } 408 } 409 410 profile := models.Profile{ 411 Did: did, 412 Avatar: avatar, 413 Description: description, 414 IncludeBluesky: includeBluesky, 415 Location: location, 416 Links: links, 417 Stats: stats, 418 PinnedRepos: pinned, 419 Pronouns: pronouns, 420 PreferredHandle: preferredHandle, 421 } 422 423 ddb, ok := i.Db.Execer.(*db.DB) 424 if !ok { 425 return fmt.Errorf("failed to index profile record, invalid db cast") 426 } 427 428 tx, err := ddb.Begin() 429 if err != nil { 430 return fmt.Errorf("failed to start transaction") 431 } 432 433 err = db.ValidateProfile(tx, &profile) 434 if err != nil { 435 return fmt.Errorf("invalid profile record") 436 } 437 438 err = db.UpsertProfile(tx, &profile) 439 if err == nil && i.Cache != nil { 440 pipe := i.Cache.Pipeline() 441 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did) 442 if preferredHandle != "" { 443 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL) 444 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL) 445 } else { 446 pipe.Del(ctx, didKey) 447 } 448 if _, execErr := pipe.Exec(ctx); execErr != nil { 449 l.Warn("failed to update preferred handle cache", "err", execErr) 450 } 451 } 452 case jmodels.CommitOperationDelete: 453 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 454 } 455 456 if err != nil { 457 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 458 } 459 460 return nil 461} 462 463func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 464 did := e.Did 465 var err error 466 467 l := i.Logger.With("handler", "ingestSpindleMember") 468 l = l.With("nsid", e.Commit.Collection) 469 470 switch e.Commit.Operation { 471 case jmodels.CommitOperationCreate: 472 raw := json.RawMessage(e.Commit.Record) 473 record := tangled.SpindleMember{} 474 err = json.Unmarshal(raw, &record) 475 if err != nil { 476 l.Error("invalid record", "err", err) 477 return err 478 } 479 480 // only spindle owner can invite to spindles 481 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 482 if err != nil || !ok { 483 return fmt.Errorf("failed to enforce permissions: %w", err) 484 } 485 486 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 487 if err != nil { 488 return err 489 } 490 491 if memberId.Handle.IsInvalidHandle() { 492 return err 493 } 494 495 ddb, ok := i.Db.Execer.(*db.DB) 496 if !ok { 497 return fmt.Errorf("invalid db cast") 498 } 499 500 err = db.AddSpindleMember(ddb, models.SpindleMember{ 501 Did: syntax.DID(did), 502 Rkey: e.Commit.RKey, 503 Instance: record.Instance, 504 Subject: memberId.DID, 505 }) 506 if !ok { 507 return fmt.Errorf("failed to add to db: %w", err) 508 } 509 510 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 511 if err != nil { 512 return fmt.Errorf("failed to update ACLs: %w", err) 513 } 514 515 l.Info("added spindle member") 516 case jmodels.CommitOperationDelete: 517 rkey := e.Commit.RKey 518 519 ddb, ok := i.Db.Execer.(*db.DB) 520 if !ok { 521 return fmt.Errorf("failed to index profile record, invalid db cast") 522 } 523 524 // get record from db first 525 members, err := db.GetSpindleMembers( 526 ddb, 527 orm.FilterEq("did", did), 528 orm.FilterEq("rkey", rkey), 529 ) 530 if err != nil || len(members) != 1 { 531 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 532 } 533 member := members[0] 534 535 tx, err := ddb.Begin() 536 if err != nil { 537 return fmt.Errorf("failed to start txn: %w", err) 538 } 539 540 // remove record by rkey && update enforcer 541 if err = db.RemoveSpindleMember( 542 tx, 543 orm.FilterEq("did", did), 544 orm.FilterEq("rkey", rkey), 545 ); err != nil { 546 return fmt.Errorf("failed to remove from db: %w", err) 547 } 548 549 // update enforcer 550 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 551 if err != nil { 552 return fmt.Errorf("failed to update ACLs: %w", err) 553 } 554 555 if err = tx.Commit(); err != nil { 556 return fmt.Errorf("failed to commit txn: %w", err) 557 } 558 559 if err = i.Enforcer.E.SavePolicy(); err != nil { 560 return fmt.Errorf("failed to save ACLs: %w", err) 561 } 562 563 l.Info("removed spindle member") 564 } 565 566 return nil 567} 568 569func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 570 did := e.Did 571 var err error 572 573 l := i.Logger.With("handler", "ingestSpindle") 574 l = l.With("nsid", e.Commit.Collection) 575 576 switch e.Commit.Operation { 577 case jmodels.CommitOperationCreate: 578 raw := json.RawMessage(e.Commit.Record) 579 record := tangled.Spindle{} 580 err = json.Unmarshal(raw, &record) 581 if err != nil { 582 l.Error("invalid record", "err", err) 583 return err 584 } 585 586 instance := e.Commit.RKey 587 588 ddb, ok := i.Db.Execer.(*db.DB) 589 if !ok { 590 return fmt.Errorf("failed to index profile record, invalid db cast") 591 } 592 593 err := db.AddSpindle(ddb, models.Spindle{ 594 Owner: syntax.DID(did), 595 Instance: instance, 596 }) 597 if err != nil { 598 l.Error("failed to add spindle to db", "err", err, "instance", instance) 599 return err 600 } 601 602 err = retry.Do( 603 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 604 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 605 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 606 ) 607 if err != nil { 608 l.Error("failed to verify spindle after retries", "err", err, "instance", instance) 609 return err 610 } 611 612 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 613 if err != nil { 614 return fmt.Errorf("failed to mark verified: %w", err) 615 } 616 617 return nil 618 619 case jmodels.CommitOperationDelete: 620 instance := e.Commit.RKey 621 622 ddb, ok := i.Db.Execer.(*db.DB) 623 if !ok { 624 return fmt.Errorf("failed to index profile record, invalid db cast") 625 } 626 627 // get record from db first 628 spindles, err := db.GetSpindles( 629 ctx, 630 ddb, 631 orm.FilterEq("owner", did), 632 orm.FilterEq("instance", instance), 633 ) 634 if err != nil || len(spindles) != 1 { 635 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 636 } 637 spindle := spindles[0] 638 639 tx, err := ddb.Begin() 640 if err != nil { 641 return err 642 } 643 defer func() { 644 tx.Rollback() 645 i.Enforcer.E.LoadPolicy() 646 }() 647 648 // remove spindle members first 649 err = db.RemoveSpindleMember( 650 tx, 651 orm.FilterEq("owner", did), 652 orm.FilterEq("instance", instance), 653 ) 654 if err != nil { 655 return err 656 } 657 658 err = db.DeleteSpindle( 659 tx, 660 orm.FilterEq("owner", did), 661 orm.FilterEq("instance", instance), 662 ) 663 if err != nil { 664 return err 665 } 666 667 if spindle.Verified != nil { 668 err = i.Enforcer.RemoveSpindle(instance) 669 if err != nil { 670 return err 671 } 672 } 673 674 err = tx.Commit() 675 if err != nil { 676 return err 677 } 678 679 err = i.Enforcer.E.SavePolicy() 680 if err != nil { 681 return err 682 } 683 } 684 685 return nil 686} 687 688func (i *Ingester) ingestString(e *jmodels.Event) error { 689 did := e.Did 690 rkey := e.Commit.RKey 691 692 var err error 693 694 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 695 l.Info("ingesting record") 696 697 ddb, ok := i.Db.Execer.(*db.DB) 698 if !ok { 699 return fmt.Errorf("failed to index string record, invalid db cast") 700 } 701 702 switch e.Commit.Operation { 703 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 704 raw := json.RawMessage(e.Commit.Record) 705 record := tangled.String{} 706 err = json.Unmarshal(raw, &record) 707 if err != nil { 708 l.Error("invalid record", "err", err) 709 return err 710 } 711 712 string := models.StringFromRecord(did, rkey, record) 713 714 if err = i.Validator.ValidateString(&string); err != nil { 715 l.Error("invalid record", "err", err) 716 return err 717 } 718 719 if err = db.AddString(ddb, string); err != nil { 720 l.Error("failed to add string", "err", err) 721 return err 722 } 723 724 return nil 725 726 case jmodels.CommitOperationDelete: 727 if err := db.DeleteString( 728 ddb, 729 orm.FilterEq("did", did), 730 orm.FilterEq("rkey", rkey), 731 ); err != nil { 732 l.Error("failed to delete", "err", err) 733 return fmt.Errorf("failed to delete string record: %w", err) 734 } 735 736 return nil 737 } 738 739 return nil 740} 741 742func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 743 did := e.Did 744 var err error 745 746 l := i.Logger.With("handler", "ingestKnotMember") 747 l = l.With("nsid", e.Commit.Collection) 748 749 switch e.Commit.Operation { 750 case jmodels.CommitOperationCreate: 751 raw := json.RawMessage(e.Commit.Record) 752 record := tangled.KnotMember{} 753 err = json.Unmarshal(raw, &record) 754 if err != nil { 755 l.Error("invalid record", "err", err) 756 return err 757 } 758 759 // only knot owner can invite to knots 760 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 761 if err != nil || !ok { 762 return fmt.Errorf("failed to enforce permissions: %w", err) 763 } 764 765 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 766 if err != nil { 767 return err 768 } 769 770 if memberId.Handle.IsInvalidHandle() { 771 return err 772 } 773 774 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 775 if err != nil { 776 return fmt.Errorf("failed to update ACLs: %w", err) 777 } 778 779 l.Info("added knot member") 780 case jmodels.CommitOperationDelete: 781 // we don't store knot members in a table (like we do for spindle) 782 // and we can't remove this just yet. possibly fixed if we switch 783 // to either: 784 // 1. a knot_members table like with spindle and store the rkey 785 // 2. use the knot host as the rkey 786 // 787 // TODO: implement member deletion 788 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 789 } 790 791 return nil 792} 793 794func (i *Ingester) ingestKnot(e *jmodels.Event) error { 795 did := e.Did 796 var err error 797 798 l := i.Logger.With("handler", "ingestKnot") 799 l = l.With("nsid", e.Commit.Collection) 800 801 switch e.Commit.Operation { 802 case jmodels.CommitOperationCreate: 803 raw := json.RawMessage(e.Commit.Record) 804 record := tangled.Knot{} 805 err = json.Unmarshal(raw, &record) 806 if err != nil { 807 l.Error("invalid record", "err", err) 808 return err 809 } 810 811 domain := e.Commit.RKey 812 813 ddb, ok := i.Db.Execer.(*db.DB) 814 if !ok { 815 return fmt.Errorf("failed to index profile record, invalid db cast") 816 } 817 818 err := db.AddKnot(ddb, domain, did) 819 if err != nil { 820 l.Error("failed to add knot to db", "err", err, "domain", domain) 821 return err 822 } 823 824 err = retry.Do( 825 func() error { 826 return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 827 }, 828 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 829 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 830 ) 831 if err != nil { 832 l.Error("failed to verify knot after retries", "err", err, "domain", domain) 833 return err 834 } 835 836 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 837 if err != nil { 838 return fmt.Errorf("failed to mark verified: %w", err) 839 } 840 841 return nil 842 843 case jmodels.CommitOperationDelete: 844 domain := e.Commit.RKey 845 846 ddb, ok := i.Db.Execer.(*db.DB) 847 if !ok { 848 return fmt.Errorf("failed to index knot record, invalid db cast") 849 } 850 851 // get record from db first 852 registrations, err := db.GetRegistrations( 853 ddb, 854 orm.FilterEq("domain", domain), 855 orm.FilterEq("did", did), 856 ) 857 if err != nil { 858 return fmt.Errorf("failed to get registration: %w", err) 859 } 860 if len(registrations) != 1 { 861 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations)) 862 } 863 registration := registrations[0] 864 865 tx, err := ddb.Begin() 866 if err != nil { 867 return err 868 } 869 defer func() { 870 tx.Rollback() 871 i.Enforcer.E.LoadPolicy() 872 }() 873 874 err = db.DeleteKnot( 875 tx, 876 orm.FilterEq("did", did), 877 orm.FilterEq("domain", domain), 878 ) 879 if err != nil { 880 return err 881 } 882 883 if registration.Registered != nil { 884 err = i.Enforcer.RemoveKnot(domain) 885 if err != nil { 886 return err 887 } 888 } 889 890 err = tx.Commit() 891 if err != nil { 892 return err 893 } 894 895 err = i.Enforcer.E.SavePolicy() 896 if err != nil { 897 return err 898 } 899 } 900 901 return nil 902} 903func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 904 did := e.Did 905 rkey := e.Commit.RKey 906 907 var err error 908 909 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 910 l.Info("ingesting record") 911 912 ddb, ok := i.Db.Execer.(*db.DB) 913 if !ok { 914 return fmt.Errorf("failed to index issue record, invalid db cast") 915 } 916 917 switch e.Commit.Operation { 918 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 919 raw := json.RawMessage(e.Commit.Record) 920 record := tangled.RepoIssue{} 921 err = json.Unmarshal(raw, &record) 922 if err != nil { 923 l.Error("invalid record", "err", err) 924 return err 925 } 926 927 issue := models.IssueFromRecord(did, rkey, record) 928 929 if issue.RepoAt == "" { 930 return fmt.Errorf("issue record has no repo field") 931 } 932 933 if err := i.Validator.ValidateIssue(&issue); err != nil { 934 return fmt.Errorf("failed to validate issue: %w", err) 935 } 936 937 if record.Repo != nil { 938 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo) 939 if repoErr == nil && repo.RepoDid != "" { 940 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil { 941 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 942 } 943 } 944 } 945 946 tx, err := ddb.BeginTx(ctx, nil) 947 if err != nil { 948 l.Error("failed to begin transaction", "err", err) 949 return err 950 } 951 defer tx.Rollback() 952 953 err = db.PutIssue(tx, &issue) 954 if err != nil { 955 l.Error("failed to create issue", "err", err) 956 return err 957 } 958 959 err = tx.Commit() 960 if err != nil { 961 l.Error("failed to commit txn", "err", err) 962 return err 963 } 964 965 return nil 966 967 case jmodels.CommitOperationDelete: 968 tx, err := ddb.BeginTx(ctx, nil) 969 if err != nil { 970 l.Error("failed to begin transaction", "err", err) 971 return err 972 } 973 defer tx.Rollback() 974 975 if err := db.DeleteIssues( 976 tx, 977 did, 978 rkey, 979 ); err != nil { 980 l.Error("failed to delete", "err", err) 981 return fmt.Errorf("failed to delete issue record: %w", err) 982 } 983 if err := tx.Commit(); err != nil { 984 l.Error("failed to commit txn", "err", err) 985 return err 986 } 987 988 return nil 989 } 990 991 return nil 992} 993 994func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error { 995 did := e.Did 996 rkey := e.Commit.RKey 997 998 var err error 999 1000 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1001 l.Info("ingesting record") 1002 1003 ddb, ok := i.Db.Execer.(*db.DB) 1004 if !ok { 1005 return fmt.Errorf("failed to index pull record, invalid db cast") 1006 } 1007 1008 switch e.Commit.Operation { 1009 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1010 raw := json.RawMessage(e.Commit.Record) 1011 record := tangled.RepoPull{} 1012 err = json.Unmarshal(raw, &record) 1013 if err != nil { 1014 l.Error("invalid record", "err", err) 1015 return err 1016 } 1017 1018 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 1019 if err != nil { 1020 l.Error("failed to resolve did") 1021 return err 1022 } 1023 1024 // go through and fetch all blobs in parallel 1025 readers := make([]*io.ReadCloser, len(record.Rounds)) 1026 var mu sync.Mutex 1027 1028 g, gctx := errgroup.WithContext(ctx) 1029 1030 for idx, b := range record.Rounds { 1031 g.Go(func() error { 1032 // for some reason, a blob is empty 1033 if b.PatchBlob == nil { 1034 return fmt.Errorf("missing patchBlob in round %d", idx) 1035 } 1036 1037 ownerPds := ownerId.PDSEndpoint() 1038 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1039 q := url.Query() 1040 q.Set("cid", b.PatchBlob.Ref.String()) 1041 q.Set("did", did) 1042 url.RawQuery = q.Encode() 1043 1044 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1045 if err != nil { 1046 l.Error("failed to create request") 1047 return err 1048 } 1049 req.Header.Set("Content-Type", "application/json") 1050 1051 resp, err := http.DefaultClient.Do(req) 1052 if err != nil { 1053 l.Error("failed to make request") 1054 return err 1055 } 1056 1057 mu.Lock() 1058 readers[idx] = &resp.Body 1059 mu.Unlock() 1060 1061 return nil 1062 }) 1063 } 1064 1065 if err := g.Wait(); err != nil { 1066 for _, r := range readers { 1067 if r != nil && *r != nil { 1068 (*r).Close() 1069 } 1070 } 1071 return err 1072 } 1073 1074 defer func() { 1075 for _, r := range readers { 1076 if r != nil && *r != nil { 1077 (*r).Close() 1078 } 1079 } 1080 }() 1081 1082 pull, err := models.PullFromRecord(did, rkey, record, readers) 1083 if err != nil { 1084 return fmt.Errorf("failed to parse pull from record: %w", err) 1085 } 1086 if err := i.Validator.ValidatePull(pull); err != nil { 1087 return fmt.Errorf("failed to validate pull: %w", err) 1088 } 1089 1090 tx, err := ddb.BeginTx(ctx, nil) 1091 if err != nil { 1092 l.Error("failed to begin transaction", "err", err) 1093 return err 1094 } 1095 defer tx.Rollback() 1096 1097 err = db.PutPull(tx, pull) 1098 if err != nil { 1099 l.Error("failed to create pull", "err", err) 1100 return err 1101 } 1102 1103 err = tx.Commit() 1104 if err != nil { 1105 l.Error("failed to commit txn", "err", err) 1106 return err 1107 } 1108 1109 return nil 1110 1111 case jmodels.CommitOperationDelete: 1112 tx, err := ddb.BeginTx(ctx, nil) 1113 if err != nil { 1114 l.Error("failed to begin transaction", "err", err) 1115 return err 1116 } 1117 defer tx.Rollback() 1118 1119 if err := db.AbandonPulls( 1120 tx, 1121 orm.FilterEq("owner_did", did), 1122 orm.FilterEq("rkey", rkey), 1123 ); err != nil { 1124 l.Error("failed to abandon", "err", err) 1125 return fmt.Errorf("failed to abandon pull record: %w", err) 1126 } 1127 if err := tx.Commit(); err != nil { 1128 l.Error("failed to commit txn", "err", err) 1129 return err 1130 } 1131 1132 return nil 1133 } 1134 1135 return nil 1136} 1137 1138func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 1139 did := e.Did 1140 rkey := e.Commit.RKey 1141 1142 var err error 1143 1144 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1145 l.Info("ingesting record") 1146 1147 ddb, ok := i.Db.Execer.(*db.DB) 1148 if !ok { 1149 return fmt.Errorf("failed to index issue comment record, invalid db cast") 1150 } 1151 1152 switch e.Commit.Operation { 1153 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1154 raw := json.RawMessage(e.Commit.Record) 1155 record := tangled.RepoIssueComment{} 1156 err = json.Unmarshal(raw, &record) 1157 if err != nil { 1158 return fmt.Errorf("invalid record: %w", err) 1159 } 1160 1161 comment, err := models.IssueCommentFromRecord(did, rkey, record) 1162 if err != nil { 1163 return fmt.Errorf("failed to parse comment from record: %w", err) 1164 } 1165 1166 if err := i.Validator.ValidateIssueComment(comment); err != nil { 1167 return fmt.Errorf("failed to validate comment: %w", err) 1168 } 1169 1170 tx, err := ddb.Begin() 1171 if err != nil { 1172 return fmt.Errorf("failed to start transaction: %w", err) 1173 } 1174 defer tx.Rollback() 1175 1176 _, err = db.AddIssueComment(tx, *comment) 1177 if err != nil { 1178 return fmt.Errorf("failed to create issue comment: %w", err) 1179 } 1180 1181 return tx.Commit() 1182 1183 case jmodels.CommitOperationDelete: 1184 if err := db.DeleteIssueComments( 1185 ddb, 1186 orm.FilterEq("did", did), 1187 orm.FilterEq("rkey", rkey), 1188 ); err != nil { 1189 return fmt.Errorf("failed to delete issue comment record: %w", err) 1190 } 1191 1192 return nil 1193 } 1194 1195 return nil 1196} 1197 1198func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 1199 did := e.Did 1200 rkey := e.Commit.RKey 1201 1202 var err error 1203 1204 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1205 l.Info("ingesting record") 1206 1207 ddb, ok := i.Db.Execer.(*db.DB) 1208 if !ok { 1209 return fmt.Errorf("failed to index label definition, invalid db cast") 1210 } 1211 1212 switch e.Commit.Operation { 1213 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1214 raw := json.RawMessage(e.Commit.Record) 1215 record := tangled.LabelDefinition{} 1216 err = json.Unmarshal(raw, &record) 1217 if err != nil { 1218 return fmt.Errorf("invalid record: %w", err) 1219 } 1220 1221 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1222 if err != nil { 1223 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1224 } 1225 1226 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1227 return fmt.Errorf("failed to validate labeldef: %w", err) 1228 } 1229 1230 _, err = db.AddLabelDefinition(ddb, def) 1231 if err != nil { 1232 return fmt.Errorf("failed to create labeldef: %w", err) 1233 } 1234 1235 return nil 1236 1237 case jmodels.CommitOperationDelete: 1238 if err := db.DeleteLabelDefinition( 1239 ddb, 1240 orm.FilterEq("did", did), 1241 orm.FilterEq("rkey", rkey), 1242 ); err != nil { 1243 return fmt.Errorf("failed to delete labeldef record: %w", err) 1244 } 1245 1246 return nil 1247 } 1248 1249 return nil 1250} 1251 1252func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1253 did := e.Did 1254 rkey := e.Commit.RKey 1255 1256 var err error 1257 1258 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1259 l.Info("ingesting record") 1260 1261 ddb, ok := i.Db.Execer.(*db.DB) 1262 if !ok { 1263 return fmt.Errorf("failed to index label op, invalid db cast") 1264 } 1265 1266 switch e.Commit.Operation { 1267 case jmodels.CommitOperationCreate: 1268 raw := json.RawMessage(e.Commit.Record) 1269 record := tangled.LabelOp{} 1270 err = json.Unmarshal(raw, &record) 1271 if err != nil { 1272 return fmt.Errorf("invalid record: %w", err) 1273 } 1274 1275 subject := syntax.ATURI(record.Subject) 1276 collection := subject.Collection() 1277 1278 var repo *models.Repo 1279 switch collection { 1280 case tangled.RepoIssueNSID: 1281 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1282 if err != nil || len(i) != 1 { 1283 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1284 } 1285 repo = i[0].Repo 1286 default: 1287 return fmt.Errorf("unsupported label subject: %s", collection) 1288 } 1289 1290 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1291 if err != nil { 1292 return fmt.Errorf("failed to build label application ctx: %w", err) 1293 } 1294 1295 ops := models.LabelOpsFromRecord(did, rkey, record) 1296 1297 for _, o := range ops { 1298 def, ok := actx.Defs[o.OperandKey] 1299 if !ok { 1300 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1301 } 1302 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1303 return fmt.Errorf("failed to validate labelop: %w", err) 1304 } 1305 } 1306 1307 tx, err := ddb.Begin() 1308 if err != nil { 1309 return err 1310 } 1311 defer tx.Rollback() 1312 1313 for _, o := range ops { 1314 _, err = db.AddLabelOp(tx, &o) 1315 if err != nil { 1316 return fmt.Errorf("failed to add labelop: %w", err) 1317 } 1318 } 1319 1320 if err = tx.Commit(); err != nil { 1321 return err 1322 } 1323 } 1324 1325 return nil 1326}