Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 1133 lines 28 kB view raw
1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "maps" 11 "slices" 12 13 "time" 14 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 jmodels "github.com/bluesky-social/jetstream/pkg/models" 17 "github.com/go-git/go-git/v5/plumbing" 18 "github.com/ipfs/go-cid" 19 "tangled.org/core/api/tangled" 20 "tangled.org/core/appview/config" 21 "tangled.org/core/appview/db" 22 "tangled.org/core/appview/models" 23 "tangled.org/core/appview/serververify" 24 "tangled.org/core/appview/validator" 25 "tangled.org/core/idresolver" 26 "tangled.org/core/orm" 27 "tangled.org/core/rbac" 28) 29 30type Ingester struct { 31 Db db.DbWrapper 32 Enforcer *rbac.Enforcer 33 IdResolver *idresolver.Resolver 34 Config *config.Config 35 Logger *slog.Logger 36 Validator *validator.Validator 37} 38 39type processFunc func(ctx context.Context, e *jmodels.Event) error 40 41func (i *Ingester) Ingest() processFunc { 42 return func(ctx context.Context, e *jmodels.Event) error { 43 var err error 44 defer func() { 45 eventTime := e.TimeUS 46 lastTimeUs := eventTime + 1 47 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 48 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 49 } 50 }() 51 52 l := i.Logger.With("kind", e.Kind) 53 switch e.Kind { 54 case jmodels.EventKindAccount: 55 if !e.Account.Active && *e.Account.Status == "deactivated" { 56 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 57 } 58 case jmodels.EventKindIdentity: 59 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 60 case jmodels.EventKindCommit: 61 switch e.Commit.Collection { 62 case tangled.GraphFollowNSID: 63 err = i.ingestFollow(e) 64 case tangled.FeedStarNSID: 65 err = i.ingestStar(e) 66 case tangled.PublicKeyNSID: 67 err = i.ingestPublicKey(e) 68 case tangled.RepoArtifactNSID: 69 err = i.ingestArtifact(e) 70 case tangled.ActorProfileNSID: 71 err = i.ingestProfile(ctx, e) 72 case tangled.SpindleMemberNSID: 73 err = i.ingestSpindleMember(ctx, e) 74 case tangled.SpindleNSID: 75 err = i.ingestSpindle(ctx, e) 76 case tangled.KnotMemberNSID: 77 err = i.ingestKnotMember(e) 78 case tangled.KnotNSID: 79 err = i.ingestKnot(e) 80 case tangled.StringNSID: 81 err = i.ingestString(e) 82 case tangled.RepoIssueNSID: 83 err = i.ingestIssue(ctx, e) 84 case tangled.RepoIssueCommentNSID: 85 err = i.ingestIssueComment(e) 86 case tangled.LabelDefinitionNSID: 87 err = i.ingestLabelDefinition(e) 88 case tangled.LabelOpNSID: 89 err = i.ingestLabelOp(e) 90 } 91 l = i.Logger.With("nsid", e.Commit.Collection) 92 } 93 94 if err != nil { 95 l.Warn("refused to ingest record", "err", err) 96 } 97 98 return nil 99 } 100} 101 102func (i *Ingester) ingestStar(e *jmodels.Event) error { 103 var err error 104 did := e.Did 105 106 l := i.Logger.With("handler", "ingestStar") 107 l = l.With("nsid", e.Commit.Collection) 108 109 switch e.Commit.Operation { 110 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 111 var subjectUri syntax.ATURI 112 113 raw := json.RawMessage(e.Commit.Record) 114 record := tangled.FeedStar{} 115 err := json.Unmarshal(raw, &record) 116 if err != nil { 117 l.Error("invalid record", "err", err) 118 return err 119 } 120 121 star := &models.Star{ 122 Did: did, 123 Rkey: e.Commit.RKey, 124 } 125 126 switch { 127 case record.SubjectDid != nil: 128 repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid)) 129 if repoErr == nil { 130 subjectUri = repo.RepoAt() 131 star.RepoAt = subjectUri 132 } 133 case record.Subject != nil: 134 subjectUri, err = syntax.ParseATURI(*record.Subject) 135 if err != nil { 136 l.Error("invalid record", "err", err) 137 return err 138 } 139 star.RepoAt = subjectUri 140 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String()) 141 if repoErr == nil && repo.RepoDid != "" { 142 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil { 143 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 144 } 145 } 146 default: 147 l.Error("star record has neither subject nor subjectDid") 148 return fmt.Errorf("star record has neither subject nor subjectDid") 149 } 150 err = db.AddStar(i.Db, star) 151 case jmodels.CommitOperationDelete: 152 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 153 } 154 155 if err != nil { 156 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 157 } 158 159 return nil 160} 161 162func (i *Ingester) ingestFollow(e *jmodels.Event) error { 163 var err error 164 did := e.Did 165 166 l := i.Logger.With("handler", "ingestFollow") 167 l = l.With("nsid", e.Commit.Collection) 168 169 switch e.Commit.Operation { 170 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 171 raw := json.RawMessage(e.Commit.Record) 172 record := tangled.GraphFollow{} 173 err = json.Unmarshal(raw, &record) 174 if err != nil { 175 l.Error("invalid record", "err", err) 176 return err 177 } 178 179 err = db.AddFollow(i.Db, &models.Follow{ 180 UserDid: did, 181 SubjectDid: record.Subject, 182 Rkey: e.Commit.RKey, 183 }) 184 case jmodels.CommitOperationDelete: 185 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 186 } 187 188 if err != nil { 189 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 190 } 191 192 return nil 193} 194 195func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 196 did := e.Did 197 var err error 198 199 l := i.Logger.With("handler", "ingestPublicKey") 200 l = l.With("nsid", e.Commit.Collection) 201 202 switch e.Commit.Operation { 203 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 204 l.Debug("processing add of pubkey") 205 raw := json.RawMessage(e.Commit.Record) 206 record := tangled.PublicKey{} 207 err = json.Unmarshal(raw, &record) 208 if err != nil { 209 l.Error("invalid record", "err", err) 210 return err 211 } 212 213 name := record.Name 214 key := record.Key 215 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 216 case jmodels.CommitOperationDelete: 217 l.Debug("processing delete of pubkey") 218 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 219 } 220 221 if err != nil { 222 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 223 } 224 225 return nil 226} 227 228func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 229 did := e.Did 230 var err error 231 232 l := i.Logger.With("handler", "ingestArtifact") 233 l = l.With("nsid", e.Commit.Collection) 234 235 switch e.Commit.Operation { 236 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 237 raw := json.RawMessage(e.Commit.Record) 238 record := tangled.RepoArtifact{} 239 err = json.Unmarshal(raw, &record) 240 if err != nil { 241 l.Error("invalid record", "err", err) 242 return err 243 } 244 245 var repo *models.Repo 246 if record.RepoDid != nil && *record.RepoDid != "" { 247 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 248 if err != nil && !errors.Is(err, sql.ErrNoRows) { 249 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 250 } 251 } 252 if repo == nil && record.Repo != nil { 253 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 254 if parseErr != nil { 255 return parseErr 256 } 257 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 258 if err != nil { 259 return err 260 } 261 } 262 if repo == nil { 263 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 264 } 265 266 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 267 if err != nil || !ok { 268 return err 269 } 270 271 repoDid := repo.RepoDid 272 if repoDid == "" && record.RepoDid != nil { 273 repoDid = *record.RepoDid 274 } 275 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 276 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil { 277 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 278 } 279 } 280 281 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 282 if err != nil { 283 createdAt = time.Now() 284 } 285 286 artifact := models.Artifact{ 287 Did: did, 288 Rkey: e.Commit.RKey, 289 RepoAt: repo.RepoAt(), 290 Tag: plumbing.Hash(record.Tag), 291 CreatedAt: createdAt, 292 BlobCid: cid.Cid(record.Artifact.Ref), 293 Name: record.Name, 294 Size: uint64(record.Artifact.Size), 295 MimeType: record.Artifact.MimeType, 296 } 297 298 err = db.AddArtifact(i.Db, artifact) 299 case jmodels.CommitOperationDelete: 300 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 301 } 302 303 if err != nil { 304 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 305 } 306 307 return nil 308} 309 310func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error { 311 did := e.Did 312 var err error 313 314 l := i.Logger.With("handler", "ingestProfile") 315 l = l.With("nsid", e.Commit.Collection) 316 317 if e.Commit.RKey != "self" { 318 return fmt.Errorf("ingestProfile only ingests `self` record") 319 } 320 321 switch e.Commit.Operation { 322 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 323 raw := json.RawMessage(e.Commit.Record) 324 record := tangled.ActorProfile{} 325 err = json.Unmarshal(raw, &record) 326 if err != nil { 327 l.Error("invalid record", "err", err) 328 return err 329 } 330 331 avatar := "" 332 if record.Avatar != nil { 333 avatar = record.Avatar.Ref.String() 334 } 335 336 description := "" 337 if record.Description != nil { 338 description = *record.Description 339 } 340 341 includeBluesky := record.Bluesky 342 343 pronouns := "" 344 if record.Pronouns != nil { 345 pronouns = *record.Pronouns 346 } 347 348 location := "" 349 if record.Location != nil { 350 location = *record.Location 351 } 352 353 var links [5]string 354 for i, l := range record.Links { 355 if i < 5 { 356 links[i] = l 357 } 358 } 359 360 var stats [2]models.VanityStat 361 for i, s := range record.Stats { 362 if i < 2 { 363 stats[i].Kind = models.ParseVanityStatKind(s) 364 } 365 } 366 367 var pinned [6]string 368 for i, r := range record.PinnedRepositories { 369 if i < 6 { 370 pinned[i] = r 371 } 372 } 373 374 var preferredHandle syntax.Handle 375 if record.PreferredHandle != nil { 376 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 377 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 378 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 379 preferredHandle = h 380 } 381 } 382 } 383 384 profile := models.Profile{ 385 Did: did, 386 Avatar: avatar, 387 Description: description, 388 IncludeBluesky: includeBluesky, 389 Location: location, 390 Links: links, 391 Stats: stats, 392 PinnedRepos: pinned, 393 Pronouns: pronouns, 394 PreferredHandle: preferredHandle, 395 } 396 397 ddb, ok := i.Db.Execer.(*db.DB) 398 if !ok { 399 return fmt.Errorf("failed to index profile record, invalid db cast") 400 } 401 402 tx, err := ddb.Begin() 403 if err != nil { 404 return fmt.Errorf("failed to start transaction") 405 } 406 407 err = db.ValidateProfile(tx, &profile) 408 if err != nil { 409 return fmt.Errorf("invalid profile record") 410 } 411 412 err = db.UpsertProfile(tx, &profile) 413 case jmodels.CommitOperationDelete: 414 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 415 } 416 417 if err != nil { 418 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 419 } 420 421 return nil 422} 423 424func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 425 did := e.Did 426 var err error 427 428 l := i.Logger.With("handler", "ingestSpindleMember") 429 l = l.With("nsid", e.Commit.Collection) 430 431 switch e.Commit.Operation { 432 case jmodels.CommitOperationCreate: 433 raw := json.RawMessage(e.Commit.Record) 434 record := tangled.SpindleMember{} 435 err = json.Unmarshal(raw, &record) 436 if err != nil { 437 l.Error("invalid record", "err", err) 438 return err 439 } 440 441 // only spindle owner can invite to spindles 442 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 443 if err != nil || !ok { 444 return fmt.Errorf("failed to enforce permissions: %w", err) 445 } 446 447 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 448 if err != nil { 449 return err 450 } 451 452 if memberId.Handle.IsInvalidHandle() { 453 return err 454 } 455 456 ddb, ok := i.Db.Execer.(*db.DB) 457 if !ok { 458 return fmt.Errorf("invalid db cast") 459 } 460 461 err = db.AddSpindleMember(ddb, models.SpindleMember{ 462 Did: syntax.DID(did), 463 Rkey: e.Commit.RKey, 464 Instance: record.Instance, 465 Subject: memberId.DID, 466 }) 467 if !ok { 468 return fmt.Errorf("failed to add to db: %w", err) 469 } 470 471 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 472 if err != nil { 473 return fmt.Errorf("failed to update ACLs: %w", err) 474 } 475 476 l.Info("added spindle member") 477 case jmodels.CommitOperationDelete: 478 rkey := e.Commit.RKey 479 480 ddb, ok := i.Db.Execer.(*db.DB) 481 if !ok { 482 return fmt.Errorf("failed to index profile record, invalid db cast") 483 } 484 485 // get record from db first 486 members, err := db.GetSpindleMembers( 487 ddb, 488 orm.FilterEq("did", did), 489 orm.FilterEq("rkey", rkey), 490 ) 491 if err != nil || len(members) != 1 { 492 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 493 } 494 member := members[0] 495 496 tx, err := ddb.Begin() 497 if err != nil { 498 return fmt.Errorf("failed to start txn: %w", err) 499 } 500 501 // remove record by rkey && update enforcer 502 if err = db.RemoveSpindleMember( 503 tx, 504 orm.FilterEq("did", did), 505 orm.FilterEq("rkey", rkey), 506 ); err != nil { 507 return fmt.Errorf("failed to remove from db: %w", err) 508 } 509 510 // update enforcer 511 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 512 if err != nil { 513 return fmt.Errorf("failed to update ACLs: %w", err) 514 } 515 516 if err = tx.Commit(); err != nil { 517 return fmt.Errorf("failed to commit txn: %w", err) 518 } 519 520 if err = i.Enforcer.E.SavePolicy(); err != nil { 521 return fmt.Errorf("failed to save ACLs: %w", err) 522 } 523 524 l.Info("removed spindle member") 525 } 526 527 return nil 528} 529 530func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 531 did := e.Did 532 var err error 533 534 l := i.Logger.With("handler", "ingestSpindle") 535 l = l.With("nsid", e.Commit.Collection) 536 537 switch e.Commit.Operation { 538 case jmodels.CommitOperationCreate: 539 raw := json.RawMessage(e.Commit.Record) 540 record := tangled.Spindle{} 541 err = json.Unmarshal(raw, &record) 542 if err != nil { 543 l.Error("invalid record", "err", err) 544 return err 545 } 546 547 instance := e.Commit.RKey 548 549 ddb, ok := i.Db.Execer.(*db.DB) 550 if !ok { 551 return fmt.Errorf("failed to index profile record, invalid db cast") 552 } 553 554 err := db.AddSpindle(ddb, models.Spindle{ 555 Owner: syntax.DID(did), 556 Instance: instance, 557 }) 558 if err != nil { 559 l.Error("failed to add spindle to db", "err", err, "instance", instance) 560 return err 561 } 562 563 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) 564 if err != nil { 565 l.Error("failed to add spindle to db", "err", err, "instance", instance) 566 return err 567 } 568 569 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 570 if err != nil { 571 return fmt.Errorf("failed to mark verified: %w", err) 572 } 573 574 return nil 575 576 case jmodels.CommitOperationDelete: 577 instance := e.Commit.RKey 578 579 ddb, ok := i.Db.Execer.(*db.DB) 580 if !ok { 581 return fmt.Errorf("failed to index profile record, invalid db cast") 582 } 583 584 // get record from db first 585 spindles, err := db.GetSpindles( 586 ctx, 587 ddb, 588 orm.FilterEq("owner", did), 589 orm.FilterEq("instance", instance), 590 ) 591 if err != nil || len(spindles) != 1 { 592 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 593 } 594 spindle := spindles[0] 595 596 tx, err := ddb.Begin() 597 if err != nil { 598 return err 599 } 600 defer func() { 601 tx.Rollback() 602 i.Enforcer.E.LoadPolicy() 603 }() 604 605 // remove spindle members first 606 err = db.RemoveSpindleMember( 607 tx, 608 orm.FilterEq("owner", did), 609 orm.FilterEq("instance", instance), 610 ) 611 if err != nil { 612 return err 613 } 614 615 err = db.DeleteSpindle( 616 tx, 617 orm.FilterEq("owner", did), 618 orm.FilterEq("instance", instance), 619 ) 620 if err != nil { 621 return err 622 } 623 624 if spindle.Verified != nil { 625 err = i.Enforcer.RemoveSpindle(instance) 626 if err != nil { 627 return err 628 } 629 } 630 631 err = tx.Commit() 632 if err != nil { 633 return err 634 } 635 636 err = i.Enforcer.E.SavePolicy() 637 if err != nil { 638 return err 639 } 640 } 641 642 return nil 643} 644 645func (i *Ingester) ingestString(e *jmodels.Event) error { 646 did := e.Did 647 rkey := e.Commit.RKey 648 649 var err error 650 651 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 652 l.Info("ingesting record") 653 654 ddb, ok := i.Db.Execer.(*db.DB) 655 if !ok { 656 return fmt.Errorf("failed to index string record, invalid db cast") 657 } 658 659 switch e.Commit.Operation { 660 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 661 raw := json.RawMessage(e.Commit.Record) 662 record := tangled.String{} 663 err = json.Unmarshal(raw, &record) 664 if err != nil { 665 l.Error("invalid record", "err", err) 666 return err 667 } 668 669 string := models.StringFromRecord(did, rkey, record) 670 671 if err = i.Validator.ValidateString(&string); err != nil { 672 l.Error("invalid record", "err", err) 673 return err 674 } 675 676 if err = db.AddString(ddb, string); err != nil { 677 l.Error("failed to add string", "err", err) 678 return err 679 } 680 681 return nil 682 683 case jmodels.CommitOperationDelete: 684 if err := db.DeleteString( 685 ddb, 686 orm.FilterEq("did", did), 687 orm.FilterEq("rkey", rkey), 688 ); err != nil { 689 l.Error("failed to delete", "err", err) 690 return fmt.Errorf("failed to delete string record: %w", err) 691 } 692 693 return nil 694 } 695 696 return nil 697} 698 699func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 700 did := e.Did 701 var err error 702 703 l := i.Logger.With("handler", "ingestKnotMember") 704 l = l.With("nsid", e.Commit.Collection) 705 706 switch e.Commit.Operation { 707 case jmodels.CommitOperationCreate: 708 raw := json.RawMessage(e.Commit.Record) 709 record := tangled.KnotMember{} 710 err = json.Unmarshal(raw, &record) 711 if err != nil { 712 l.Error("invalid record", "err", err) 713 return err 714 } 715 716 // only knot owner can invite to knots 717 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 718 if err != nil || !ok { 719 return fmt.Errorf("failed to enforce permissions: %w", err) 720 } 721 722 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 723 if err != nil { 724 return err 725 } 726 727 if memberId.Handle.IsInvalidHandle() { 728 return err 729 } 730 731 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 732 if err != nil { 733 return fmt.Errorf("failed to update ACLs: %w", err) 734 } 735 736 l.Info("added knot member") 737 case jmodels.CommitOperationDelete: 738 // we don't store knot members in a table (like we do for spindle) 739 // and we can't remove this just yet. possibly fixed if we switch 740 // to either: 741 // 1. a knot_members table like with spindle and store the rkey 742 // 2. use the knot host as the rkey 743 // 744 // TODO: implement member deletion 745 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 746 } 747 748 return nil 749} 750 751func (i *Ingester) ingestKnot(e *jmodels.Event) error { 752 did := e.Did 753 var err error 754 755 l := i.Logger.With("handler", "ingestKnot") 756 l = l.With("nsid", e.Commit.Collection) 757 758 switch e.Commit.Operation { 759 case jmodels.CommitOperationCreate: 760 raw := json.RawMessage(e.Commit.Record) 761 record := tangled.Knot{} 762 err = json.Unmarshal(raw, &record) 763 if err != nil { 764 l.Error("invalid record", "err", err) 765 return err 766 } 767 768 domain := e.Commit.RKey 769 770 ddb, ok := i.Db.Execer.(*db.DB) 771 if !ok { 772 return fmt.Errorf("failed to index profile record, invalid db cast") 773 } 774 775 err := db.AddKnot(ddb, domain, did) 776 if err != nil { 777 l.Error("failed to add knot to db", "err", err, "domain", domain) 778 return err 779 } 780 781 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 782 if err != nil { 783 l.Error("failed to verify knot", "err", err, "domain", domain) 784 return err 785 } 786 787 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 788 if err != nil { 789 return fmt.Errorf("failed to mark verified: %w", err) 790 } 791 792 return nil 793 794 case jmodels.CommitOperationDelete: 795 domain := e.Commit.RKey 796 797 ddb, ok := i.Db.Execer.(*db.DB) 798 if !ok { 799 return fmt.Errorf("failed to index knot record, invalid db cast") 800 } 801 802 // get record from db first 803 registrations, err := db.GetRegistrations( 804 ddb, 805 orm.FilterEq("domain", domain), 806 orm.FilterEq("did", did), 807 ) 808 if err != nil { 809 return fmt.Errorf("failed to get registration: %w", err) 810 } 811 if len(registrations) != 1 { 812 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 813 } 814 registration := registrations[0] 815 816 tx, err := ddb.Begin() 817 if err != nil { 818 return err 819 } 820 defer func() { 821 tx.Rollback() 822 i.Enforcer.E.LoadPolicy() 823 }() 824 825 err = db.DeleteKnot( 826 tx, 827 orm.FilterEq("did", did), 828 orm.FilterEq("domain", domain), 829 ) 830 if err != nil { 831 return err 832 } 833 834 if registration.Registered != nil { 835 err = i.Enforcer.RemoveKnot(domain) 836 if err != nil { 837 return err 838 } 839 } 840 841 err = tx.Commit() 842 if err != nil { 843 return err 844 } 845 846 err = i.Enforcer.E.SavePolicy() 847 if err != nil { 848 return err 849 } 850 } 851 852 return nil 853} 854func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 855 did := e.Did 856 rkey := e.Commit.RKey 857 858 var err error 859 860 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 861 l.Info("ingesting record") 862 863 ddb, ok := i.Db.Execer.(*db.DB) 864 if !ok { 865 return fmt.Errorf("failed to index issue record, invalid db cast") 866 } 867 868 switch e.Commit.Operation { 869 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 870 raw := json.RawMessage(e.Commit.Record) 871 record := tangled.RepoIssue{} 872 err = json.Unmarshal(raw, &record) 873 if err != nil { 874 l.Error("invalid record", "err", err) 875 return err 876 } 877 878 issue := models.IssueFromRecord(did, rkey, record) 879 880 if issue.RepoAt == "" { 881 return fmt.Errorf("issue record has no repo field") 882 } 883 884 if err := i.Validator.ValidateIssue(&issue); err != nil { 885 return fmt.Errorf("failed to validate issue: %w", err) 886 } 887 888 if record.Repo != nil { 889 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo) 890 if repoErr == nil && repo.RepoDid != "" { 891 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil { 892 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 893 } 894 } 895 } 896 897 tx, err := ddb.BeginTx(ctx, nil) 898 if err != nil { 899 l.Error("failed to begin transaction", "err", err) 900 return err 901 } 902 defer tx.Rollback() 903 904 err = db.PutIssue(tx, &issue) 905 if err != nil { 906 l.Error("failed to create issue", "err", err) 907 return err 908 } 909 910 err = tx.Commit() 911 if err != nil { 912 l.Error("failed to commit txn", "err", err) 913 return err 914 } 915 916 return nil 917 918 case jmodels.CommitOperationDelete: 919 tx, err := ddb.BeginTx(ctx, nil) 920 if err != nil { 921 l.Error("failed to begin transaction", "err", err) 922 return err 923 } 924 defer tx.Rollback() 925 926 if err := db.DeleteIssues( 927 tx, 928 did, 929 rkey, 930 ); err != nil { 931 l.Error("failed to delete", "err", err) 932 return fmt.Errorf("failed to delete issue record: %w", err) 933 } 934 if err := tx.Commit(); err != nil { 935 l.Error("failed to commit txn", "err", err) 936 return err 937 } 938 939 return nil 940 } 941 942 return nil 943} 944 945func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 946 did := e.Did 947 rkey := e.Commit.RKey 948 949 var err error 950 951 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 952 l.Info("ingesting record") 953 954 ddb, ok := i.Db.Execer.(*db.DB) 955 if !ok { 956 return fmt.Errorf("failed to index issue comment record, invalid db cast") 957 } 958 959 switch e.Commit.Operation { 960 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 961 raw := json.RawMessage(e.Commit.Record) 962 record := tangled.RepoIssueComment{} 963 err = json.Unmarshal(raw, &record) 964 if err != nil { 965 return fmt.Errorf("invalid record: %w", err) 966 } 967 968 comment, err := models.IssueCommentFromRecord(did, rkey, record) 969 if err != nil { 970 return fmt.Errorf("failed to parse comment from record: %w", err) 971 } 972 973 if err := i.Validator.ValidateIssueComment(comment); err != nil { 974 return fmt.Errorf("failed to validate comment: %w", err) 975 } 976 977 tx, err := ddb.Begin() 978 if err != nil { 979 return fmt.Errorf("failed to start transaction: %w", err) 980 } 981 defer tx.Rollback() 982 983 _, err = db.AddIssueComment(tx, *comment) 984 if err != nil { 985 return fmt.Errorf("failed to create issue comment: %w", err) 986 } 987 988 return tx.Commit() 989 990 case jmodels.CommitOperationDelete: 991 if err := db.DeleteIssueComments( 992 ddb, 993 orm.FilterEq("did", did), 994 orm.FilterEq("rkey", rkey), 995 ); err != nil { 996 return fmt.Errorf("failed to delete issue comment record: %w", err) 997 } 998 999 return nil 1000 } 1001 1002 return nil 1003} 1004 1005func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 1006 did := e.Did 1007 rkey := e.Commit.RKey 1008 1009 var err error 1010 1011 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1012 l.Info("ingesting record") 1013 1014 ddb, ok := i.Db.Execer.(*db.DB) 1015 if !ok { 1016 return fmt.Errorf("failed to index label definition, invalid db cast") 1017 } 1018 1019 switch e.Commit.Operation { 1020 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1021 raw := json.RawMessage(e.Commit.Record) 1022 record := tangled.LabelDefinition{} 1023 err = json.Unmarshal(raw, &record) 1024 if err != nil { 1025 return fmt.Errorf("invalid record: %w", err) 1026 } 1027 1028 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1029 if err != nil { 1030 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1031 } 1032 1033 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1034 return fmt.Errorf("failed to validate labeldef: %w", err) 1035 } 1036 1037 _, err = db.AddLabelDefinition(ddb, def) 1038 if err != nil { 1039 return fmt.Errorf("failed to create labeldef: %w", err) 1040 } 1041 1042 return nil 1043 1044 case jmodels.CommitOperationDelete: 1045 if err := db.DeleteLabelDefinition( 1046 ddb, 1047 orm.FilterEq("did", did), 1048 orm.FilterEq("rkey", rkey), 1049 ); err != nil { 1050 return fmt.Errorf("failed to delete labeldef record: %w", err) 1051 } 1052 1053 return nil 1054 } 1055 1056 return nil 1057} 1058 1059func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1060 did := e.Did 1061 rkey := e.Commit.RKey 1062 1063 var err error 1064 1065 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1066 l.Info("ingesting record") 1067 1068 ddb, ok := i.Db.Execer.(*db.DB) 1069 if !ok { 1070 return fmt.Errorf("failed to index label op, invalid db cast") 1071 } 1072 1073 switch e.Commit.Operation { 1074 case jmodels.CommitOperationCreate: 1075 raw := json.RawMessage(e.Commit.Record) 1076 record := tangled.LabelOp{} 1077 err = json.Unmarshal(raw, &record) 1078 if err != nil { 1079 return fmt.Errorf("invalid record: %w", err) 1080 } 1081 1082 subject := syntax.ATURI(record.Subject) 1083 collection := subject.Collection() 1084 1085 var repo *models.Repo 1086 switch collection { 1087 case tangled.RepoIssueNSID: 1088 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1089 if err != nil || len(i) != 1 { 1090 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1091 } 1092 repo = i[0].Repo 1093 default: 1094 return fmt.Errorf("unsupport label subject: %s", collection) 1095 } 1096 1097 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1098 if err != nil { 1099 return fmt.Errorf("failed to build label application ctx: %w", err) 1100 } 1101 1102 ops := models.LabelOpsFromRecord(did, rkey, record) 1103 1104 for _, o := range ops { 1105 def, ok := actx.Defs[o.OperandKey] 1106 if !ok { 1107 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1108 } 1109 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1110 return fmt.Errorf("failed to validate labelop: %w", err) 1111 } 1112 } 1113 1114 tx, err := ddb.Begin() 1115 if err != nil { 1116 return err 1117 } 1118 defer tx.Rollback() 1119 1120 for _, o := range ops { 1121 _, err = db.AddLabelOp(tx, &o) 1122 if err != nil { 1123 return fmt.Errorf("failed to add labelop: %w", err) 1124 } 1125 } 1126 1127 if err = tx.Commit(); err != nil { 1128 return err 1129 } 1130 } 1131 1132 return nil 1133}