Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 3e74eb7e955e3bf42e8d59ea0a0da4391c0a763c 1142 lines 28 kB view raw
1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "maps" 11 "slices" 12 13 "time" 14 15 "github.com/avast/retry-go/v4" 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 jmodels "github.com/bluesky-social/jetstream/pkg/models" 18 "github.com/go-git/go-git/v5/plumbing" 19 "github.com/ipfs/go-cid" 20 "tangled.org/core/api/tangled" 21 "tangled.org/core/appview/config" 22 "tangled.org/core/appview/db" 23 "tangled.org/core/appview/models" 24 "tangled.org/core/appview/serververify" 25 "tangled.org/core/appview/validator" 26 "tangled.org/core/idresolver" 27 "tangled.org/core/orm" 28 "tangled.org/core/rbac" 29) 30 31type Ingester struct { 32 Db db.DbWrapper 33 Enforcer *rbac.Enforcer 34 IdResolver *idresolver.Resolver 35 Config *config.Config 36 Logger *slog.Logger 37 Validator *validator.Validator 38} 39 40type processFunc func(ctx context.Context, e *jmodels.Event) error 41 42func (i *Ingester) Ingest() processFunc { 43 return func(ctx context.Context, e *jmodels.Event) error { 44 var err error 45 46 l := i.Logger.With("kind", e.Kind) 47 switch e.Kind { 48 case jmodels.EventKindAccount: 49 if !e.Account.Active && *e.Account.Status == "deactivated" { 50 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 51 } 52 case jmodels.EventKindIdentity: 53 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 54 case jmodels.EventKindCommit: 55 switch e.Commit.Collection { 56 case tangled.GraphFollowNSID: 57 err = i.ingestFollow(e) 58 case tangled.FeedStarNSID: 59 err = i.ingestStar(e) 60 case tangled.PublicKeyNSID: 61 err = i.ingestPublicKey(e) 62 case tangled.RepoArtifactNSID: 63 err = i.ingestArtifact(e) 64 case tangled.ActorProfileNSID: 65 err = i.ingestProfile(ctx, e) 66 case tangled.SpindleMemberNSID: 67 err = i.ingestSpindleMember(ctx, e) 68 case tangled.SpindleNSID: 69 err = i.ingestSpindle(ctx, e) 70 case tangled.KnotMemberNSID: 71 err = i.ingestKnotMember(e) 72 case tangled.KnotNSID: 73 err = i.ingestKnot(e) 74 case tangled.StringNSID: 75 err = i.ingestString(e) 76 case tangled.RepoIssueNSID: 77 err = i.ingestIssue(ctx, e) 78 case tangled.RepoIssueCommentNSID: 79 err = i.ingestIssueComment(e) 80 case tangled.LabelDefinitionNSID: 81 err = i.ingestLabelDefinition(e) 82 case tangled.LabelOpNSID: 83 err = i.ingestLabelOp(e) 84 } 85 l = i.Logger.With("nsid", e.Commit.Collection) 86 } 87 88 if err != nil { 89 l.Warn("failed to ingest record, skipping", "err", err) 90 } 91 92 lastTimeUs := e.TimeUS + 1 93 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 94 l.Error("failed to save cursor", "err", saveErr) 95 } 96 97 return nil 98 } 99} 100 101func (i *Ingester) ingestStar(e *jmodels.Event) error { 102 var err error 103 did := e.Did 104 105 l := i.Logger.With("handler", "ingestStar") 106 l = l.With("nsid", e.Commit.Collection) 107 108 switch e.Commit.Operation { 109 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 110 var subjectUri syntax.ATURI 111 112 raw := json.RawMessage(e.Commit.Record) 113 record := tangled.FeedStar{} 114 err := json.Unmarshal(raw, &record) 115 if err != nil { 116 l.Error("invalid record", "err", err) 117 return err 118 } 119 120 star := &models.Star{ 121 Did: did, 122 Rkey: e.Commit.RKey, 123 } 124 125 switch { 126 case record.SubjectDid != nil: 127 repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid)) 128 if repoErr == nil { 129 subjectUri = repo.RepoAt() 130 star.RepoAt = subjectUri 131 } 132 case record.Subject != nil: 133 subjectUri, err = syntax.ParseATURI(*record.Subject) 134 if err != nil { 135 l.Error("invalid record", "err", err) 136 return err 137 } 138 star.RepoAt = subjectUri 139 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String()) 140 if repoErr == nil && repo.RepoDid != "" { 141 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil { 142 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 143 } 144 } 145 default: 146 l.Error("star record has neither subject nor subjectDid") 147 return fmt.Errorf("star record has neither subject nor subjectDid") 148 } 149 err = db.AddStar(i.Db, star) 150 case jmodels.CommitOperationDelete: 151 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 152 } 153 154 if err != nil { 155 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 156 } 157 158 return nil 159} 160 161func (i *Ingester) ingestFollow(e *jmodels.Event) error { 162 var err error 163 did := e.Did 164 165 l := i.Logger.With("handler", "ingestFollow") 166 l = l.With("nsid", e.Commit.Collection) 167 168 switch e.Commit.Operation { 169 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 170 raw := json.RawMessage(e.Commit.Record) 171 record := tangled.GraphFollow{} 172 err = json.Unmarshal(raw, &record) 173 if err != nil { 174 l.Error("invalid record", "err", err) 175 return err 176 } 177 178 err = db.AddFollow(i.Db, &models.Follow{ 179 UserDid: did, 180 SubjectDid: record.Subject, 181 Rkey: e.Commit.RKey, 182 }) 183 case jmodels.CommitOperationDelete: 184 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 185 } 186 187 if err != nil { 188 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 189 } 190 191 return nil 192} 193 194func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 195 did := e.Did 196 var err error 197 198 l := i.Logger.With("handler", "ingestPublicKey") 199 l = l.With("nsid", e.Commit.Collection) 200 201 switch e.Commit.Operation { 202 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 203 l.Debug("processing add of pubkey") 204 raw := json.RawMessage(e.Commit.Record) 205 record := tangled.PublicKey{} 206 err = json.Unmarshal(raw, &record) 207 if err != nil { 208 l.Error("invalid record", "err", err) 209 return err 210 } 211 212 name := record.Name 213 key := record.Key 214 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 215 case jmodels.CommitOperationDelete: 216 l.Debug("processing delete of pubkey") 217 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 218 } 219 220 if err != nil { 221 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 222 } 223 224 return nil 225} 226 227func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 228 did := e.Did 229 var err error 230 231 l := i.Logger.With("handler", "ingestArtifact") 232 l = l.With("nsid", e.Commit.Collection) 233 234 switch e.Commit.Operation { 235 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 236 raw := json.RawMessage(e.Commit.Record) 237 record := tangled.RepoArtifact{} 238 err = json.Unmarshal(raw, &record) 239 if err != nil { 240 l.Error("invalid record", "err", err) 241 return err 242 } 243 244 var repo *models.Repo 245 if record.RepoDid != nil && *record.RepoDid != "" { 246 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 247 if err != nil && !errors.Is(err, sql.ErrNoRows) { 248 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 249 } 250 } 251 if repo == nil && record.Repo != nil { 252 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 253 if parseErr != nil { 254 return parseErr 255 } 256 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 257 if err != nil { 258 return err 259 } 260 } 261 if repo == nil { 262 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 263 } 264 265 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 266 if err != nil || !ok { 267 return err 268 } 269 270 repoDid := repo.RepoDid 271 if repoDid == "" && record.RepoDid != nil { 272 repoDid = *record.RepoDid 273 } 274 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 275 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil { 276 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 277 } 278 } 279 280 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 281 if err != nil { 282 createdAt = time.Now() 283 } 284 285 artifact := models.Artifact{ 286 Did: did, 287 Rkey: e.Commit.RKey, 288 RepoAt: repo.RepoAt(), 289 Tag: plumbing.Hash(record.Tag), 290 CreatedAt: createdAt, 291 BlobCid: cid.Cid(record.Artifact.Ref), 292 Name: record.Name, 293 Size: uint64(record.Artifact.Size), 294 MimeType: record.Artifact.MimeType, 295 } 296 297 err = db.AddArtifact(i.Db, artifact) 298 case jmodels.CommitOperationDelete: 299 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 300 } 301 302 if err != nil { 303 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 304 } 305 306 return nil 307} 308 309func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error { 310 did := e.Did 311 var err error 312 313 l := i.Logger.With("handler", "ingestProfile") 314 l = l.With("nsid", e.Commit.Collection) 315 316 if e.Commit.RKey != "self" { 317 return fmt.Errorf("ingestProfile only ingests `self` record") 318 } 319 320 switch e.Commit.Operation { 321 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 322 raw := json.RawMessage(e.Commit.Record) 323 record := tangled.ActorProfile{} 324 err = json.Unmarshal(raw, &record) 325 if err != nil { 326 l.Error("invalid record", "err", err) 327 return err 328 } 329 330 avatar := "" 331 if record.Avatar != nil { 332 avatar = record.Avatar.Ref.String() 333 } 334 335 description := "" 336 if record.Description != nil { 337 description = *record.Description 338 } 339 340 includeBluesky := record.Bluesky 341 342 pronouns := "" 343 if record.Pronouns != nil { 344 pronouns = *record.Pronouns 345 } 346 347 location := "" 348 if record.Location != nil { 349 location = *record.Location 350 } 351 352 var links [5]string 353 for i, l := range record.Links { 354 if i < 5 { 355 links[i] = l 356 } 357 } 358 359 var stats [2]models.VanityStat 360 for i, s := range record.Stats { 361 if i < 2 { 362 stats[i].Kind = models.ParseVanityStatKind(s) 363 } 364 } 365 366 var pinned [6]string 367 for i, r := range record.PinnedRepositories { 368 if i < 6 { 369 pinned[i] = r 370 } 371 } 372 373 var preferredHandle syntax.Handle 374 if record.PreferredHandle != nil { 375 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 376 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 377 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 378 preferredHandle = h 379 } 380 } 381 } 382 383 profile := models.Profile{ 384 Did: did, 385 Avatar: avatar, 386 Description: description, 387 IncludeBluesky: includeBluesky, 388 Location: location, 389 Links: links, 390 Stats: stats, 391 PinnedRepos: pinned, 392 Pronouns: pronouns, 393 PreferredHandle: preferredHandle, 394 } 395 396 ddb, ok := i.Db.Execer.(*db.DB) 397 if !ok { 398 return fmt.Errorf("failed to index profile record, invalid db cast") 399 } 400 401 tx, err := ddb.Begin() 402 if err != nil { 403 return fmt.Errorf("failed to start transaction") 404 } 405 406 err = db.ValidateProfile(tx, &profile) 407 if err != nil { 408 return fmt.Errorf("invalid profile record") 409 } 410 411 err = db.UpsertProfile(tx, &profile) 412 case jmodels.CommitOperationDelete: 413 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 414 } 415 416 if err != nil { 417 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 418 } 419 420 return nil 421} 422 423func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 424 did := e.Did 425 var err error 426 427 l := i.Logger.With("handler", "ingestSpindleMember") 428 l = l.With("nsid", e.Commit.Collection) 429 430 switch e.Commit.Operation { 431 case jmodels.CommitOperationCreate: 432 raw := json.RawMessage(e.Commit.Record) 433 record := tangled.SpindleMember{} 434 err = json.Unmarshal(raw, &record) 435 if err != nil { 436 l.Error("invalid record", "err", err) 437 return err 438 } 439 440 // only spindle owner can invite to spindles 441 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 442 if err != nil || !ok { 443 return fmt.Errorf("failed to enforce permissions: %w", err) 444 } 445 446 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 447 if err != nil { 448 return err 449 } 450 451 if memberId.Handle.IsInvalidHandle() { 452 return err 453 } 454 455 ddb, ok := i.Db.Execer.(*db.DB) 456 if !ok { 457 return fmt.Errorf("invalid db cast") 458 } 459 460 err = db.AddSpindleMember(ddb, models.SpindleMember{ 461 Did: syntax.DID(did), 462 Rkey: e.Commit.RKey, 463 Instance: record.Instance, 464 Subject: memberId.DID, 465 }) 466 if !ok { 467 return fmt.Errorf("failed to add to db: %w", err) 468 } 469 470 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 471 if err != nil { 472 return fmt.Errorf("failed to update ACLs: %w", err) 473 } 474 475 l.Info("added spindle member") 476 case jmodels.CommitOperationDelete: 477 rkey := e.Commit.RKey 478 479 ddb, ok := i.Db.Execer.(*db.DB) 480 if !ok { 481 return fmt.Errorf("failed to index profile record, invalid db cast") 482 } 483 484 // get record from db first 485 members, err := db.GetSpindleMembers( 486 ddb, 487 orm.FilterEq("did", did), 488 orm.FilterEq("rkey", rkey), 489 ) 490 if err != nil || len(members) != 1 { 491 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 492 } 493 member := members[0] 494 495 tx, err := ddb.Begin() 496 if err != nil { 497 return fmt.Errorf("failed to start txn: %w", err) 498 } 499 500 // remove record by rkey && update enforcer 501 if err = db.RemoveSpindleMember( 502 tx, 503 orm.FilterEq("did", did), 504 orm.FilterEq("rkey", rkey), 505 ); err != nil { 506 return fmt.Errorf("failed to remove from db: %w", err) 507 } 508 509 // update enforcer 510 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 511 if err != nil { 512 return fmt.Errorf("failed to update ACLs: %w", err) 513 } 514 515 if err = tx.Commit(); err != nil { 516 return fmt.Errorf("failed to commit txn: %w", err) 517 } 518 519 if err = i.Enforcer.E.SavePolicy(); err != nil { 520 return fmt.Errorf("failed to save ACLs: %w", err) 521 } 522 523 l.Info("removed spindle member") 524 } 525 526 return nil 527} 528 529func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 530 did := e.Did 531 var err error 532 533 l := i.Logger.With("handler", "ingestSpindle") 534 l = l.With("nsid", e.Commit.Collection) 535 536 switch e.Commit.Operation { 537 case jmodels.CommitOperationCreate: 538 raw := json.RawMessage(e.Commit.Record) 539 record := tangled.Spindle{} 540 err = json.Unmarshal(raw, &record) 541 if err != nil { 542 l.Error("invalid record", "err", err) 543 return err 544 } 545 546 instance := e.Commit.RKey 547 548 ddb, ok := i.Db.Execer.(*db.DB) 549 if !ok { 550 return fmt.Errorf("failed to index profile record, invalid db cast") 551 } 552 553 err := db.AddSpindle(ddb, models.Spindle{ 554 Owner: syntax.DID(did), 555 Instance: instance, 556 }) 557 if err != nil { 558 l.Error("failed to add spindle to db", "err", err, "instance", instance) 559 return err 560 } 561 562 err = retry.Do( 563 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 564 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 565 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 566 ) 567 if err != nil { 568 l.Error("failed to verify spindle after retries", "err", err, "instance", instance) 569 return err 570 } 571 572 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 573 if err != nil { 574 return fmt.Errorf("failed to mark verified: %w", err) 575 } 576 577 return nil 578 579 case jmodels.CommitOperationDelete: 580 instance := e.Commit.RKey 581 582 ddb, ok := i.Db.Execer.(*db.DB) 583 if !ok { 584 return fmt.Errorf("failed to index profile record, invalid db cast") 585 } 586 587 // get record from db first 588 spindles, err := db.GetSpindles( 589 ctx, 590 ddb, 591 orm.FilterEq("owner", did), 592 orm.FilterEq("instance", instance), 593 ) 594 if err != nil || len(spindles) != 1 { 595 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 596 } 597 spindle := spindles[0] 598 599 tx, err := ddb.Begin() 600 if err != nil { 601 return err 602 } 603 defer func() { 604 tx.Rollback() 605 i.Enforcer.E.LoadPolicy() 606 }() 607 608 // remove spindle members first 609 err = db.RemoveSpindleMember( 610 tx, 611 orm.FilterEq("owner", did), 612 orm.FilterEq("instance", instance), 613 ) 614 if err != nil { 615 return err 616 } 617 618 err = db.DeleteSpindle( 619 tx, 620 orm.FilterEq("owner", did), 621 orm.FilterEq("instance", instance), 622 ) 623 if err != nil { 624 return err 625 } 626 627 if spindle.Verified != nil { 628 err = i.Enforcer.RemoveSpindle(instance) 629 if err != nil { 630 return err 631 } 632 } 633 634 err = tx.Commit() 635 if err != nil { 636 return err 637 } 638 639 err = i.Enforcer.E.SavePolicy() 640 if err != nil { 641 return err 642 } 643 } 644 645 return nil 646} 647 648func (i *Ingester) ingestString(e *jmodels.Event) error { 649 did := e.Did 650 rkey := e.Commit.RKey 651 652 var err error 653 654 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 655 l.Info("ingesting record") 656 657 ddb, ok := i.Db.Execer.(*db.DB) 658 if !ok { 659 return fmt.Errorf("failed to index string record, invalid db cast") 660 } 661 662 switch e.Commit.Operation { 663 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 664 raw := json.RawMessage(e.Commit.Record) 665 record := tangled.String{} 666 err = json.Unmarshal(raw, &record) 667 if err != nil { 668 l.Error("invalid record", "err", err) 669 return err 670 } 671 672 string := models.StringFromRecord(did, rkey, record) 673 674 if err = i.Validator.ValidateString(&string); err != nil { 675 l.Error("invalid record", "err", err) 676 return err 677 } 678 679 if err = db.AddString(ddb, string); err != nil { 680 l.Error("failed to add string", "err", err) 681 return err 682 } 683 684 return nil 685 686 case jmodels.CommitOperationDelete: 687 if err := db.DeleteString( 688 ddb, 689 orm.FilterEq("did", did), 690 orm.FilterEq("rkey", rkey), 691 ); err != nil { 692 l.Error("failed to delete", "err", err) 693 return fmt.Errorf("failed to delete string record: %w", err) 694 } 695 696 return nil 697 } 698 699 return nil 700} 701 702func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 703 did := e.Did 704 var err error 705 706 l := i.Logger.With("handler", "ingestKnotMember") 707 l = l.With("nsid", e.Commit.Collection) 708 709 switch e.Commit.Operation { 710 case jmodels.CommitOperationCreate: 711 raw := json.RawMessage(e.Commit.Record) 712 record := tangled.KnotMember{} 713 err = json.Unmarshal(raw, &record) 714 if err != nil { 715 l.Error("invalid record", "err", err) 716 return err 717 } 718 719 // only knot owner can invite to knots 720 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 721 if err != nil || !ok { 722 return fmt.Errorf("failed to enforce permissions: %w", err) 723 } 724 725 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 726 if err != nil { 727 return err 728 } 729 730 if memberId.Handle.IsInvalidHandle() { 731 return err 732 } 733 734 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 735 if err != nil { 736 return fmt.Errorf("failed to update ACLs: %w", err) 737 } 738 739 l.Info("added knot member") 740 case jmodels.CommitOperationDelete: 741 // we don't store knot members in a table (like we do for spindle) 742 // and we can't remove this just yet. possibly fixed if we switch 743 // to either: 744 // 1. a knot_members table like with spindle and store the rkey 745 // 2. use the knot host as the rkey 746 // 747 // TODO: implement member deletion 748 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 749 } 750 751 return nil 752} 753 754func (i *Ingester) ingestKnot(e *jmodels.Event) error { 755 did := e.Did 756 var err error 757 758 l := i.Logger.With("handler", "ingestKnot") 759 l = l.With("nsid", e.Commit.Collection) 760 761 switch e.Commit.Operation { 762 case jmodels.CommitOperationCreate: 763 raw := json.RawMessage(e.Commit.Record) 764 record := tangled.Knot{} 765 err = json.Unmarshal(raw, &record) 766 if err != nil { 767 l.Error("invalid record", "err", err) 768 return err 769 } 770 771 domain := e.Commit.RKey 772 773 ddb, ok := i.Db.Execer.(*db.DB) 774 if !ok { 775 return fmt.Errorf("failed to index profile record, invalid db cast") 776 } 777 778 err := db.AddKnot(ddb, domain, did) 779 if err != nil { 780 l.Error("failed to add knot to db", "err", err, "domain", domain) 781 return err 782 } 783 784 err = retry.Do( 785 func() error { 786 return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 787 }, 788 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 789 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 790 ) 791 if err != nil { 792 l.Error("failed to verify knot after retries", "err", err, "domain", domain) 793 return err 794 } 795 796 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 797 if err != nil { 798 return fmt.Errorf("failed to mark verified: %w", err) 799 } 800 801 return nil 802 803 case jmodels.CommitOperationDelete: 804 domain := e.Commit.RKey 805 806 ddb, ok := i.Db.Execer.(*db.DB) 807 if !ok { 808 return fmt.Errorf("failed to index knot record, invalid db cast") 809 } 810 811 // get record from db first 812 registrations, err := db.GetRegistrations( 813 ddb, 814 orm.FilterEq("domain", domain), 815 orm.FilterEq("did", did), 816 ) 817 if err != nil { 818 return fmt.Errorf("failed to get registration: %w", err) 819 } 820 if len(registrations) != 1 { 821 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 822 } 823 registration := registrations[0] 824 825 tx, err := ddb.Begin() 826 if err != nil { 827 return err 828 } 829 defer func() { 830 tx.Rollback() 831 i.Enforcer.E.LoadPolicy() 832 }() 833 834 err = db.DeleteKnot( 835 tx, 836 orm.FilterEq("did", did), 837 orm.FilterEq("domain", domain), 838 ) 839 if err != nil { 840 return err 841 } 842 843 if registration.Registered != nil { 844 err = i.Enforcer.RemoveKnot(domain) 845 if err != nil { 846 return err 847 } 848 } 849 850 err = tx.Commit() 851 if err != nil { 852 return err 853 } 854 855 err = i.Enforcer.E.SavePolicy() 856 if err != nil { 857 return err 858 } 859 } 860 861 return nil 862} 863func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 864 did := e.Did 865 rkey := e.Commit.RKey 866 867 var err error 868 869 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 870 l.Info("ingesting record") 871 872 ddb, ok := i.Db.Execer.(*db.DB) 873 if !ok { 874 return fmt.Errorf("failed to index issue record, invalid db cast") 875 } 876 877 switch e.Commit.Operation { 878 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 879 raw := json.RawMessage(e.Commit.Record) 880 record := tangled.RepoIssue{} 881 err = json.Unmarshal(raw, &record) 882 if err != nil { 883 l.Error("invalid record", "err", err) 884 return err 885 } 886 887 issue := models.IssueFromRecord(did, rkey, record) 888 889 if issue.RepoAt == "" { 890 return fmt.Errorf("issue record has no repo field") 891 } 892 893 if err := i.Validator.ValidateIssue(&issue); err != nil { 894 return fmt.Errorf("failed to validate issue: %w", err) 895 } 896 897 if record.Repo != nil { 898 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo) 899 if repoErr == nil && repo.RepoDid != "" { 900 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil { 901 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 902 } 903 } 904 } 905 906 tx, err := ddb.BeginTx(ctx, nil) 907 if err != nil { 908 l.Error("failed to begin transaction", "err", err) 909 return err 910 } 911 defer tx.Rollback() 912 913 err = db.PutIssue(tx, &issue) 914 if err != nil { 915 l.Error("failed to create issue", "err", err) 916 return err 917 } 918 919 err = tx.Commit() 920 if err != nil { 921 l.Error("failed to commit txn", "err", err) 922 return err 923 } 924 925 return nil 926 927 case jmodels.CommitOperationDelete: 928 tx, err := ddb.BeginTx(ctx, nil) 929 if err != nil { 930 l.Error("failed to begin transaction", "err", err) 931 return err 932 } 933 defer tx.Rollback() 934 935 if err := db.DeleteIssues( 936 tx, 937 did, 938 rkey, 939 ); err != nil { 940 l.Error("failed to delete", "err", err) 941 return fmt.Errorf("failed to delete issue record: %w", err) 942 } 943 if err := tx.Commit(); err != nil { 944 l.Error("failed to commit txn", "err", err) 945 return err 946 } 947 948 return nil 949 } 950 951 return nil 952} 953 954func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 955 did := e.Did 956 rkey := e.Commit.RKey 957 958 var err error 959 960 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 961 l.Info("ingesting record") 962 963 ddb, ok := i.Db.Execer.(*db.DB) 964 if !ok { 965 return fmt.Errorf("failed to index issue comment record, invalid db cast") 966 } 967 968 switch e.Commit.Operation { 969 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 970 raw := json.RawMessage(e.Commit.Record) 971 record := tangled.RepoIssueComment{} 972 err = json.Unmarshal(raw, &record) 973 if err != nil { 974 return fmt.Errorf("invalid record: %w", err) 975 } 976 977 comment, err := models.IssueCommentFromRecord(did, rkey, record) 978 if err != nil { 979 return fmt.Errorf("failed to parse comment from record: %w", err) 980 } 981 982 if err := i.Validator.ValidateIssueComment(comment); err != nil { 983 return fmt.Errorf("failed to validate comment: %w", err) 984 } 985 986 tx, err := ddb.Begin() 987 if err != nil { 988 return fmt.Errorf("failed to start transaction: %w", err) 989 } 990 defer tx.Rollback() 991 992 _, err = db.AddIssueComment(tx, *comment) 993 if err != nil { 994 return fmt.Errorf("failed to create issue comment: %w", err) 995 } 996 997 return tx.Commit() 998 999 case jmodels.CommitOperationDelete: 1000 if err := db.DeleteIssueComments( 1001 ddb, 1002 orm.FilterEq("did", did), 1003 orm.FilterEq("rkey", rkey), 1004 ); err != nil { 1005 return fmt.Errorf("failed to delete issue comment record: %w", err) 1006 } 1007 1008 return nil 1009 } 1010 1011 return nil 1012} 1013 1014func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 1015 did := e.Did 1016 rkey := e.Commit.RKey 1017 1018 var err error 1019 1020 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1021 l.Info("ingesting record") 1022 1023 ddb, ok := i.Db.Execer.(*db.DB) 1024 if !ok { 1025 return fmt.Errorf("failed to index label definition, invalid db cast") 1026 } 1027 1028 switch e.Commit.Operation { 1029 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1030 raw := json.RawMessage(e.Commit.Record) 1031 record := tangled.LabelDefinition{} 1032 err = json.Unmarshal(raw, &record) 1033 if err != nil { 1034 return fmt.Errorf("invalid record: %w", err) 1035 } 1036 1037 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1038 if err != nil { 1039 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1040 } 1041 1042 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1043 return fmt.Errorf("failed to validate labeldef: %w", err) 1044 } 1045 1046 _, err = db.AddLabelDefinition(ddb, def) 1047 if err != nil { 1048 return fmt.Errorf("failed to create labeldef: %w", err) 1049 } 1050 1051 return nil 1052 1053 case jmodels.CommitOperationDelete: 1054 if err := db.DeleteLabelDefinition( 1055 ddb, 1056 orm.FilterEq("did", did), 1057 orm.FilterEq("rkey", rkey), 1058 ); err != nil { 1059 return fmt.Errorf("failed to delete labeldef record: %w", err) 1060 } 1061 1062 return nil 1063 } 1064 1065 return nil 1066} 1067 1068func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1069 did := e.Did 1070 rkey := e.Commit.RKey 1071 1072 var err error 1073 1074 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1075 l.Info("ingesting record") 1076 1077 ddb, ok := i.Db.Execer.(*db.DB) 1078 if !ok { 1079 return fmt.Errorf("failed to index label op, invalid db cast") 1080 } 1081 1082 switch e.Commit.Operation { 1083 case jmodels.CommitOperationCreate: 1084 raw := json.RawMessage(e.Commit.Record) 1085 record := tangled.LabelOp{} 1086 err = json.Unmarshal(raw, &record) 1087 if err != nil { 1088 return fmt.Errorf("invalid record: %w", err) 1089 } 1090 1091 subject := syntax.ATURI(record.Subject) 1092 collection := subject.Collection() 1093 1094 var repo *models.Repo 1095 switch collection { 1096 case tangled.RepoIssueNSID: 1097 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1098 if err != nil || len(i) != 1 { 1099 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1100 } 1101 repo = i[0].Repo 1102 default: 1103 return fmt.Errorf("unsupport label subject: %s", collection) 1104 } 1105 1106 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1107 if err != nil { 1108 return fmt.Errorf("failed to build label application ctx: %w", err) 1109 } 1110 1111 ops := models.LabelOpsFromRecord(did, rkey, record) 1112 1113 for _, o := range ops { 1114 def, ok := actx.Defs[o.OperandKey] 1115 if !ok { 1116 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1117 } 1118 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1119 return fmt.Errorf("failed to validate labelop: %w", err) 1120 } 1121 } 1122 1123 tx, err := ddb.Begin() 1124 if err != nil { 1125 return err 1126 } 1127 defer tx.Rollback() 1128 1129 for _, o := range ops { 1130 _, err = db.AddLabelOp(tx, &o) 1131 if err != nil { 1132 return fmt.Errorf("failed to add labelop: %w", err) 1133 } 1134 } 1135 1136 if err = tx.Commit(); err != nil { 1137 return err 1138 } 1139 } 1140 1141 return nil 1142}