Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at intent-query-param 1400 lines 35 kB view raw
1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "net/http" 13 "net/url" 14 "slices" 15 "sync" 16 17 "time" 18 19 "github.com/avast/retry-go/v4" 20 "github.com/bluesky-social/indigo/atproto/syntax" 21 jmodels "github.com/bluesky-social/jetstream/pkg/models" 22 "github.com/go-git/go-git/v5/plumbing" 23 "github.com/ipfs/go-cid" 24 "golang.org/x/sync/errgroup" 25 "tangled.org/core/api/tangled" 26 "tangled.org/core/appview/cache" 27 "tangled.org/core/appview/config" 28 "tangled.org/core/appview/db" 29 "tangled.org/core/appview/models" 30 "tangled.org/core/appview/serververify" 31 "tangled.org/core/appview/validator" 32 "tangled.org/core/idresolver" 33 "tangled.org/core/orm" 34 "tangled.org/core/rbac" 35) 36 37type Ingester struct { 38 Db db.DbWrapper 39 Enforcer *rbac.Enforcer 40 IdResolver *idresolver.Resolver 41 Cache *cache.Cache 42 Config *config.Config 43 Logger *slog.Logger 44 Validator *validator.Validator 45} 46 47type processFunc func(ctx context.Context, e *jmodels.Event) error 48 49func (i *Ingester) Ingest() processFunc { 50 return func(ctx context.Context, e *jmodels.Event) error { 51 var err error 52 53 l := i.Logger.With("kind", e.Kind) 54 switch e.Kind { 55 case jmodels.EventKindAccount: 56 // TODO: sync account state to db 57 if e.Account.Active { 58 break 59 } 60 // TODO: revoke sessions by DID 61 if *e.Account.Status == "deactivated" { 62 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 63 } 64 case jmodels.EventKindIdentity: 65 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 66 case jmodels.EventKindCommit: 67 switch e.Commit.Collection { 68 case tangled.GraphFollowNSID: 69 err = i.ingestFollow(e) 70 case tangled.GraphVouchNSID: 71 err = i.ingestVouch(ctx, e) 72 case tangled.FeedStarNSID: 73 err = i.ingestStar(e) 74 case tangled.PublicKeyNSID: 75 err = i.ingestPublicKey(e) 76 case tangled.RepoArtifactNSID: 77 err = i.ingestArtifact(e) 78 case tangled.ActorProfileNSID: 79 err = i.ingestProfile(ctx, e) 80 case tangled.SpindleMemberNSID: 81 err = i.ingestSpindleMember(ctx, e) 82 case tangled.SpindleNSID: 83 err = i.ingestSpindle(ctx, e) 84 case tangled.KnotMemberNSID: 85 err = i.ingestKnotMember(e) 86 case tangled.KnotNSID: 87 err = i.ingestKnot(e) 88 case tangled.StringNSID: 89 err = i.ingestString(e) 90 case tangled.RepoIssueNSID: 91 err = i.ingestIssue(ctx, e) 92 case tangled.RepoPullNSID: 93 err = i.ingestPull(ctx, e) 94 case tangled.RepoIssueCommentNSID: 95 err = i.ingestIssueComment(e) 96 case tangled.LabelDefinitionNSID: 97 err = i.ingestLabelDefinition(e) 98 case tangled.LabelOpNSID: 99 err = i.ingestLabelOp(e) 100 } 101 l = i.Logger.With("nsid", e.Commit.Collection) 102 } 103 104 if err != nil { 105 l.Warn("failed to ingest record, skipping", "err", err) 106 } 107 108 lastTimeUs := e.TimeUS + 1 109 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 110 l.Error("failed to save cursor", "err", saveErr) 111 } 112 113 return nil 114 } 115} 116 117func (i *Ingester) ingestStar(e *jmodels.Event) error { 118 var err error 119 did := e.Did 120 121 l := i.Logger.With("handler", "ingestStar") 122 l = l.With("nsid", e.Commit.Collection) 123 124 switch e.Commit.Operation { 125 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 126 var subjectUri syntax.ATURI 127 128 raw := json.RawMessage(e.Commit.Record) 129 record := tangled.FeedStar{} 130 err := json.Unmarshal(raw, &record) 131 if err != nil { 132 l.Error("invalid record", "err", err) 133 return err 134 } 135 136 star := &models.Star{ 137 Did: did, 138 Rkey: e.Commit.RKey, 139 } 140 141 switch { 142 case record.SubjectDid != nil: 143 repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid)) 144 if repoErr == nil { 145 subjectUri = repo.RepoAt() 146 star.RepoAt = subjectUri 147 } 148 case record.Subject != nil: 149 subjectUri, err = syntax.ParseATURI(*record.Subject) 150 if err != nil { 151 l.Error("invalid record", "err", err) 152 return err 153 } 154 star.RepoAt = subjectUri 155 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String()) 156 if repoErr == nil && repo.RepoDid != "" { 157 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil { 158 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 159 } 160 } 161 default: 162 l.Error("star record has neither subject nor subjectDid") 163 return fmt.Errorf("star record has neither subject nor subjectDid") 164 } 165 err = db.AddStar(i.Db, star) 166 case jmodels.CommitOperationDelete: 167 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 168 } 169 170 if err != nil { 171 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 172 } 173 174 return nil 175} 176 177func (i *Ingester) ingestFollow(e *jmodels.Event) error { 178 var err error 179 did := e.Did 180 181 l := i.Logger.With("handler", "ingestFollow") 182 l = l.With("nsid", e.Commit.Collection) 183 184 switch e.Commit.Operation { 185 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 186 raw := json.RawMessage(e.Commit.Record) 187 record := tangled.GraphFollow{} 188 err = json.Unmarshal(raw, &record) 189 if err != nil { 190 l.Error("invalid record", "err", err) 191 return err 192 } 193 194 err = db.AddFollow(i.Db, &models.Follow{ 195 UserDid: did, 196 SubjectDid: record.Subject, 197 Rkey: e.Commit.RKey, 198 }) 199 case jmodels.CommitOperationDelete: 200 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 201 } 202 203 if err != nil { 204 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 205 } 206 207 return nil 208} 209 210func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event) error { 211 var err error 212 did := e.Did 213 214 l := i.Logger.With("handler", "ingestVouch") 215 l = l.With("nsid", e.Commit.Collection) 216 l.Info("ingesting vouch") 217 218 switch e.Commit.Operation { 219 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 220 raw := json.RawMessage(e.Commit.Record) 221 record := tangled.GraphVouch{} 222 err = json.Unmarshal(raw, &record) 223 if err != nil { 224 l.Error("invalid record", "err", err) 225 return err 226 } 227 228 // rkey is the subject_did being vouched for/denounced 229 subjectDID := e.Commit.RKey 230 231 _, err = syntax.ParseDID(subjectDID) 232 if err != nil { 233 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID) 234 return fmt.Errorf("invalid subject_did: %w", err) 235 } 236 237 if did == subjectDID { 238 l.Warn("attempted self-vouch", "did", did) 239 return fmt.Errorf("cannot vouch for self") 240 } 241 242 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID) 243 if err != nil { 244 return err 245 } 246 247 if subjectId.Handle.IsInvalidHandle() { 248 return err 249 } 250 251 kind, err := models.ParseVouchKind(record.Kind) 252 if err != nil { 253 l.Error("invalid kind", "kind", kind) 254 return fmt.Errorf("invalid kind: %s", kind) 255 } 256 257 recordCid, err := cid.Parse(e.Commit.CID) 258 if err != nil { 259 l.Error("invalid cid", "err", err, "cid", e.Commit.CID) 260 return fmt.Errorf("invalid cid: %w", err) 261 } 262 263 err = db.AddVouch(i.Db, &models.Vouch{ 264 Did: syntax.DID(did), 265 SubjectDid: subjectId.DID, 266 Cid: recordCid, 267 Kind: kind, 268 Reason: record.Reason, 269 }) 270 271 case jmodels.CommitOperationDelete: 272 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey) 273 } 274 275 if err != nil { 276 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err) 277 } 278 279 return nil 280} 281 282func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 283 did := e.Did 284 var err error 285 286 l := i.Logger.With("handler", "ingestPublicKey") 287 l = l.With("nsid", e.Commit.Collection) 288 289 switch e.Commit.Operation { 290 case jmodels.CommitOperationCreate: 291 l.Debug("processing add of pubkey") 292 raw := json.RawMessage(e.Commit.Record) 293 record := tangled.PublicKey{} 294 err = json.Unmarshal(raw, &record) 295 if err != nil { 296 l.Error("invalid record", "err", err) 297 return err 298 } 299 300 name := record.Name 301 key := record.Key 302 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 303 case jmodels.CommitOperationUpdate: 304 l.Debug("processing update of pubkey") 305 raw := json.RawMessage(e.Commit.Record) 306 record := tangled.PublicKey{} 307 err = json.Unmarshal(raw, &record) 308 if err != nil { 309 l.Error("invalid record", "err", err) 310 return err 311 } 312 313 name := record.Name 314 key := record.Key 315 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey) 316 case jmodels.CommitOperationDelete: 317 l.Debug("processing delete of pubkey") 318 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 319 } 320 321 if err != nil { 322 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 323 } 324 325 return nil 326} 327 328func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 329 did := e.Did 330 var err error 331 332 l := i.Logger.With("handler", "ingestArtifact") 333 l = l.With("nsid", e.Commit.Collection) 334 335 switch e.Commit.Operation { 336 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 337 raw := json.RawMessage(e.Commit.Record) 338 record := tangled.RepoArtifact{} 339 err = json.Unmarshal(raw, &record) 340 if err != nil { 341 l.Error("invalid record", "err", err) 342 return err 343 } 344 345 var repo *models.Repo 346 if record.RepoDid != nil && *record.RepoDid != "" { 347 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 348 if err != nil && !errors.Is(err, sql.ErrNoRows) { 349 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 350 } 351 } 352 if repo == nil && record.Repo != nil { 353 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 354 if parseErr != nil { 355 return parseErr 356 } 357 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 358 if err != nil { 359 return err 360 } 361 } 362 if repo == nil { 363 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 364 } 365 366 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 367 if err != nil || !ok { 368 return err 369 } 370 371 repoDid := repo.RepoDid 372 if repoDid == "" && record.RepoDid != nil { 373 repoDid = *record.RepoDid 374 } 375 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 376 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil { 377 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 378 } 379 } 380 381 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 382 if err != nil { 383 createdAt = time.Now() 384 } 385 386 artifact := models.Artifact{ 387 Did: did, 388 Rkey: e.Commit.RKey, 389 RepoAt: repo.RepoAt(), 390 Tag: plumbing.Hash(record.Tag), 391 CreatedAt: createdAt, 392 BlobCid: cid.Cid(record.Artifact.Ref), 393 Name: record.Name, 394 Size: uint64(record.Artifact.Size), 395 MimeType: record.Artifact.MimeType, 396 } 397 398 err = db.AddArtifact(i.Db, artifact) 399 case jmodels.CommitOperationDelete: 400 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 401 } 402 403 if err != nil { 404 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 405 } 406 407 return nil 408} 409 410func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error { 411 did := e.Did 412 var err error 413 414 l := i.Logger.With("handler", "ingestProfile") 415 l = l.With("nsid", e.Commit.Collection) 416 417 if e.Commit.RKey != "self" { 418 return fmt.Errorf("ingestProfile only ingests `self` record") 419 } 420 421 switch e.Commit.Operation { 422 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 423 raw := json.RawMessage(e.Commit.Record) 424 record := tangled.ActorProfile{} 425 err = json.Unmarshal(raw, &record) 426 if err != nil { 427 l.Error("invalid record", "err", err) 428 return err 429 } 430 431 avatar := "" 432 if record.Avatar != nil { 433 avatar = record.Avatar.Ref.String() 434 } 435 436 description := "" 437 if record.Description != nil { 438 description = *record.Description 439 } 440 441 includeBluesky := record.Bluesky 442 443 pronouns := "" 444 if record.Pronouns != nil { 445 pronouns = *record.Pronouns 446 } 447 448 location := "" 449 if record.Location != nil { 450 location = *record.Location 451 } 452 453 var links [5]string 454 for i, l := range record.Links { 455 if i < 5 { 456 links[i] = l 457 } 458 } 459 460 var stats [2]models.VanityStat 461 for i, s := range record.Stats { 462 if i < 2 { 463 stats[i].Kind = models.ParseVanityStatKind(s) 464 } 465 } 466 467 var pinned [6]string 468 for i, r := range record.PinnedRepositories { 469 if i < 6 { 470 pinned[i] = r 471 } 472 } 473 474 var preferredHandle syntax.Handle 475 if record.PreferredHandle != nil { 476 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 477 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 478 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 479 preferredHandle = h 480 } 481 } 482 } 483 484 profile := models.Profile{ 485 Did: did, 486 Avatar: avatar, 487 Description: description, 488 IncludeBluesky: includeBluesky, 489 Location: location, 490 Links: links, 491 Stats: stats, 492 PinnedRepos: pinned, 493 Pronouns: pronouns, 494 PreferredHandle: preferredHandle, 495 } 496 497 ddb, ok := i.Db.Execer.(*db.DB) 498 if !ok { 499 return fmt.Errorf("failed to index profile record, invalid db cast") 500 } 501 502 tx, err := ddb.Begin() 503 if err != nil { 504 return fmt.Errorf("failed to start transaction") 505 } 506 507 err = db.ValidateProfile(tx, &profile) 508 if err != nil { 509 return fmt.Errorf("invalid profile record") 510 } 511 512 err = db.UpsertProfile(tx, &profile) 513 if err == nil && i.Cache != nil { 514 pipe := i.Cache.Pipeline() 515 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did) 516 if preferredHandle != "" { 517 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL) 518 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL) 519 } else { 520 pipe.Del(ctx, didKey) 521 } 522 if _, execErr := pipe.Exec(ctx); execErr != nil { 523 l.Warn("failed to update preferred handle cache", "err", execErr) 524 } 525 } 526 case jmodels.CommitOperationDelete: 527 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 528 } 529 530 if err != nil { 531 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 532 } 533 534 return nil 535} 536 537func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 538 did := e.Did 539 var err error 540 541 l := i.Logger.With("handler", "ingestSpindleMember") 542 l = l.With("nsid", e.Commit.Collection) 543 544 switch e.Commit.Operation { 545 case jmodels.CommitOperationCreate: 546 raw := json.RawMessage(e.Commit.Record) 547 record := tangled.SpindleMember{} 548 err = json.Unmarshal(raw, &record) 549 if err != nil { 550 l.Error("invalid record", "err", err) 551 return err 552 } 553 554 // only spindle owner can invite to spindles 555 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 556 if err != nil || !ok { 557 return fmt.Errorf("failed to enforce permissions: %w", err) 558 } 559 560 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 561 if err != nil { 562 return err 563 } 564 565 if memberId.Handle.IsInvalidHandle() { 566 return err 567 } 568 569 ddb, ok := i.Db.Execer.(*db.DB) 570 if !ok { 571 return fmt.Errorf("invalid db cast") 572 } 573 574 err = db.AddSpindleMember(ddb, models.SpindleMember{ 575 Did: syntax.DID(did), 576 Rkey: e.Commit.RKey, 577 Instance: record.Instance, 578 Subject: memberId.DID, 579 }) 580 if !ok { 581 return fmt.Errorf("failed to add to db: %w", err) 582 } 583 584 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 585 if err != nil { 586 return fmt.Errorf("failed to update ACLs: %w", err) 587 } 588 589 l.Info("added spindle member") 590 case jmodels.CommitOperationDelete: 591 rkey := e.Commit.RKey 592 593 ddb, ok := i.Db.Execer.(*db.DB) 594 if !ok { 595 return fmt.Errorf("failed to index profile record, invalid db cast") 596 } 597 598 // get record from db first 599 members, err := db.GetSpindleMembers( 600 ddb, 601 orm.FilterEq("did", did), 602 orm.FilterEq("rkey", rkey), 603 ) 604 if err != nil || len(members) != 1 { 605 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 606 } 607 member := members[0] 608 609 tx, err := ddb.Begin() 610 if err != nil { 611 return fmt.Errorf("failed to start txn: %w", err) 612 } 613 614 // remove record by rkey && update enforcer 615 if err = db.RemoveSpindleMember( 616 tx, 617 orm.FilterEq("did", did), 618 orm.FilterEq("rkey", rkey), 619 ); err != nil { 620 return fmt.Errorf("failed to remove from db: %w", err) 621 } 622 623 // update enforcer 624 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 625 if err != nil { 626 return fmt.Errorf("failed to update ACLs: %w", err) 627 } 628 629 if err = tx.Commit(); err != nil { 630 return fmt.Errorf("failed to commit txn: %w", err) 631 } 632 633 if err = i.Enforcer.E.SavePolicy(); err != nil { 634 return fmt.Errorf("failed to save ACLs: %w", err) 635 } 636 637 l.Info("removed spindle member") 638 } 639 640 return nil 641} 642 643func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 644 did := e.Did 645 var err error 646 647 l := i.Logger.With("handler", "ingestSpindle") 648 l = l.With("nsid", e.Commit.Collection) 649 650 switch e.Commit.Operation { 651 case jmodels.CommitOperationCreate: 652 raw := json.RawMessage(e.Commit.Record) 653 record := tangled.Spindle{} 654 err = json.Unmarshal(raw, &record) 655 if err != nil { 656 l.Error("invalid record", "err", err) 657 return err 658 } 659 660 instance := e.Commit.RKey 661 662 ddb, ok := i.Db.Execer.(*db.DB) 663 if !ok { 664 return fmt.Errorf("failed to index profile record, invalid db cast") 665 } 666 667 err := db.AddSpindle(ddb, models.Spindle{ 668 Owner: syntax.DID(did), 669 Instance: instance, 670 }) 671 if err != nil { 672 l.Error("failed to add spindle to db", "err", err, "instance", instance) 673 return err 674 } 675 676 err = retry.Do( 677 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 678 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 679 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 680 ) 681 if err != nil { 682 l.Error("failed to verify spindle after retries", "err", err, "instance", instance) 683 return err 684 } 685 686 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 687 if err != nil { 688 return fmt.Errorf("failed to mark verified: %w", err) 689 } 690 691 return nil 692 693 case jmodels.CommitOperationDelete: 694 instance := e.Commit.RKey 695 696 ddb, ok := i.Db.Execer.(*db.DB) 697 if !ok { 698 return fmt.Errorf("failed to index profile record, invalid db cast") 699 } 700 701 // get record from db first 702 spindles, err := db.GetSpindles( 703 ctx, 704 ddb, 705 orm.FilterEq("owner", did), 706 orm.FilterEq("instance", instance), 707 ) 708 if err != nil || len(spindles) != 1 { 709 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 710 } 711 spindle := spindles[0] 712 713 tx, err := ddb.Begin() 714 if err != nil { 715 return err 716 } 717 defer func() { 718 tx.Rollback() 719 i.Enforcer.E.LoadPolicy() 720 }() 721 722 // remove spindle members first 723 err = db.RemoveSpindleMember( 724 tx, 725 orm.FilterEq("owner", did), 726 orm.FilterEq("instance", instance), 727 ) 728 if err != nil { 729 return err 730 } 731 732 err = db.DeleteSpindle( 733 tx, 734 orm.FilterEq("owner", did), 735 orm.FilterEq("instance", instance), 736 ) 737 if err != nil { 738 return err 739 } 740 741 if spindle.Verified != nil { 742 err = i.Enforcer.RemoveSpindle(instance) 743 if err != nil { 744 return err 745 } 746 } 747 748 err = tx.Commit() 749 if err != nil { 750 return err 751 } 752 753 err = i.Enforcer.E.SavePolicy() 754 if err != nil { 755 return err 756 } 757 } 758 759 return nil 760} 761 762func (i *Ingester) ingestString(e *jmodels.Event) error { 763 did := e.Did 764 rkey := e.Commit.RKey 765 766 var err error 767 768 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 769 l.Info("ingesting record") 770 771 ddb, ok := i.Db.Execer.(*db.DB) 772 if !ok { 773 return fmt.Errorf("failed to index string record, invalid db cast") 774 } 775 776 switch e.Commit.Operation { 777 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 778 raw := json.RawMessage(e.Commit.Record) 779 record := tangled.String{} 780 err = json.Unmarshal(raw, &record) 781 if err != nil { 782 l.Error("invalid record", "err", err) 783 return err 784 } 785 786 string := models.StringFromRecord(did, rkey, record) 787 788 if err = i.Validator.ValidateString(&string); err != nil { 789 l.Error("invalid record", "err", err) 790 return err 791 } 792 793 if err = db.AddString(ddb, string); err != nil { 794 l.Error("failed to add string", "err", err) 795 return err 796 } 797 798 return nil 799 800 case jmodels.CommitOperationDelete: 801 if err := db.DeleteString( 802 ddb, 803 orm.FilterEq("did", did), 804 orm.FilterEq("rkey", rkey), 805 ); err != nil { 806 l.Error("failed to delete", "err", err) 807 return fmt.Errorf("failed to delete string record: %w", err) 808 } 809 810 return nil 811 } 812 813 return nil 814} 815 816func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 817 did := e.Did 818 var err error 819 820 l := i.Logger.With("handler", "ingestKnotMember") 821 l = l.With("nsid", e.Commit.Collection) 822 823 switch e.Commit.Operation { 824 case jmodels.CommitOperationCreate: 825 raw := json.RawMessage(e.Commit.Record) 826 record := tangled.KnotMember{} 827 err = json.Unmarshal(raw, &record) 828 if err != nil { 829 l.Error("invalid record", "err", err) 830 return err 831 } 832 833 // only knot owner can invite to knots 834 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 835 if err != nil || !ok { 836 return fmt.Errorf("failed to enforce permissions: %w", err) 837 } 838 839 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 840 if err != nil { 841 return err 842 } 843 844 if memberId.Handle.IsInvalidHandle() { 845 return err 846 } 847 848 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 849 if err != nil { 850 return fmt.Errorf("failed to update ACLs: %w", err) 851 } 852 853 l.Info("added knot member") 854 case jmodels.CommitOperationDelete: 855 // we don't store knot members in a table (like we do for spindle) 856 // and we can't remove this just yet. possibly fixed if we switch 857 // to either: 858 // 1. a knot_members table like with spindle and store the rkey 859 // 2. use the knot host as the rkey 860 // 861 // TODO: implement member deletion 862 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 863 } 864 865 return nil 866} 867 868func (i *Ingester) ingestKnot(e *jmodels.Event) error { 869 did := e.Did 870 var err error 871 872 l := i.Logger.With("handler", "ingestKnot") 873 l = l.With("nsid", e.Commit.Collection) 874 875 switch e.Commit.Operation { 876 case jmodels.CommitOperationCreate: 877 raw := json.RawMessage(e.Commit.Record) 878 record := tangled.Knot{} 879 err = json.Unmarshal(raw, &record) 880 if err != nil { 881 l.Error("invalid record", "err", err) 882 return err 883 } 884 885 domain := e.Commit.RKey 886 887 ddb, ok := i.Db.Execer.(*db.DB) 888 if !ok { 889 return fmt.Errorf("failed to index profile record, invalid db cast") 890 } 891 892 err := db.AddKnot(ddb, domain, did) 893 if err != nil { 894 l.Error("failed to add knot to db", "err", err, "domain", domain) 895 return err 896 } 897 898 err = retry.Do( 899 func() error { 900 return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 901 }, 902 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 903 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 904 ) 905 if err != nil { 906 l.Error("failed to verify knot after retries", "err", err, "domain", domain) 907 return err 908 } 909 910 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 911 if err != nil { 912 return fmt.Errorf("failed to mark verified: %w", err) 913 } 914 915 return nil 916 917 case jmodels.CommitOperationDelete: 918 domain := e.Commit.RKey 919 920 ddb, ok := i.Db.Execer.(*db.DB) 921 if !ok { 922 return fmt.Errorf("failed to index knot record, invalid db cast") 923 } 924 925 // get record from db first 926 registrations, err := db.GetRegistrations( 927 ddb, 928 orm.FilterEq("domain", domain), 929 orm.FilterEq("did", did), 930 ) 931 if err != nil { 932 return fmt.Errorf("failed to get registration: %w", err) 933 } 934 if len(registrations) != 1 { 935 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations)) 936 } 937 registration := registrations[0] 938 939 tx, err := ddb.Begin() 940 if err != nil { 941 return err 942 } 943 defer func() { 944 tx.Rollback() 945 i.Enforcer.E.LoadPolicy() 946 }() 947 948 err = db.DeleteKnot( 949 tx, 950 orm.FilterEq("did", did), 951 orm.FilterEq("domain", domain), 952 ) 953 if err != nil { 954 return err 955 } 956 957 if registration.Registered != nil { 958 err = i.Enforcer.RemoveKnot(domain) 959 if err != nil { 960 return err 961 } 962 } 963 964 err = tx.Commit() 965 if err != nil { 966 return err 967 } 968 969 err = i.Enforcer.E.SavePolicy() 970 if err != nil { 971 return err 972 } 973 } 974 975 return nil 976} 977func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 978 did := e.Did 979 rkey := e.Commit.RKey 980 981 var err error 982 983 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 984 l.Info("ingesting record") 985 986 ddb, ok := i.Db.Execer.(*db.DB) 987 if !ok { 988 return fmt.Errorf("failed to index issue record, invalid db cast") 989 } 990 991 switch e.Commit.Operation { 992 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 993 raw := json.RawMessage(e.Commit.Record) 994 record := tangled.RepoIssue{} 995 err = json.Unmarshal(raw, &record) 996 if err != nil { 997 l.Error("invalid record", "err", err) 998 return err 999 } 1000 1001 issue := models.IssueFromRecord(did, rkey, record) 1002 1003 if issue.RepoAt == "" { 1004 return fmt.Errorf("issue record has no repo field") 1005 } 1006 1007 if err := i.Validator.ValidateIssue(&issue); err != nil { 1008 return fmt.Errorf("failed to validate issue: %w", err) 1009 } 1010 1011 if record.Repo != nil { 1012 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo) 1013 if repoErr == nil && repo.RepoDid != "" { 1014 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil { 1015 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 1016 } 1017 } 1018 } 1019 1020 tx, err := ddb.BeginTx(ctx, nil) 1021 if err != nil { 1022 l.Error("failed to begin transaction", "err", err) 1023 return err 1024 } 1025 defer tx.Rollback() 1026 1027 err = db.PutIssue(tx, &issue) 1028 if err != nil { 1029 l.Error("failed to create issue", "err", err) 1030 return err 1031 } 1032 1033 err = tx.Commit() 1034 if err != nil { 1035 l.Error("failed to commit txn", "err", err) 1036 return err 1037 } 1038 1039 return nil 1040 1041 case jmodels.CommitOperationDelete: 1042 tx, err := ddb.BeginTx(ctx, nil) 1043 if err != nil { 1044 l.Error("failed to begin transaction", "err", err) 1045 return err 1046 } 1047 defer tx.Rollback() 1048 1049 if err := db.DeleteIssues( 1050 tx, 1051 did, 1052 rkey, 1053 ); err != nil { 1054 l.Error("failed to delete", "err", err) 1055 return fmt.Errorf("failed to delete issue record: %w", err) 1056 } 1057 if err := tx.Commit(); err != nil { 1058 l.Error("failed to commit txn", "err", err) 1059 return err 1060 } 1061 1062 return nil 1063 } 1064 1065 return nil 1066} 1067 1068func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error { 1069 did := e.Did 1070 rkey := e.Commit.RKey 1071 1072 var err error 1073 1074 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1075 l.Info("ingesting record") 1076 1077 ddb, ok := i.Db.Execer.(*db.DB) 1078 if !ok { 1079 return fmt.Errorf("failed to index pull record, invalid db cast") 1080 } 1081 1082 switch e.Commit.Operation { 1083 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1084 raw := json.RawMessage(e.Commit.Record) 1085 record := tangled.RepoPull{} 1086 err = json.Unmarshal(raw, &record) 1087 if err != nil { 1088 l.Error("invalid record", "err", err) 1089 return err 1090 } 1091 1092 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 1093 if err != nil { 1094 l.Error("failed to resolve did") 1095 return err 1096 } 1097 1098 // go through and fetch all blobs in parallel 1099 readers := make([]*io.ReadCloser, len(record.Rounds)) 1100 var mu sync.Mutex 1101 1102 g, gctx := errgroup.WithContext(ctx) 1103 1104 for idx, b := range record.Rounds { 1105 g.Go(func() error { 1106 // for some reason, a blob is empty 1107 if b.PatchBlob == nil { 1108 return fmt.Errorf("missing patchBlob in round %d", idx) 1109 } 1110 1111 ownerPds := ownerId.PDSEndpoint() 1112 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1113 q := url.Query() 1114 q.Set("cid", b.PatchBlob.Ref.String()) 1115 q.Set("did", did) 1116 url.RawQuery = q.Encode() 1117 1118 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1119 if err != nil { 1120 l.Error("failed to create request") 1121 return err 1122 } 1123 req.Header.Set("Content-Type", "application/json") 1124 1125 resp, err := http.DefaultClient.Do(req) 1126 if err != nil { 1127 l.Error("failed to make request") 1128 return err 1129 } 1130 1131 mu.Lock() 1132 readers[idx] = &resp.Body 1133 mu.Unlock() 1134 1135 return nil 1136 }) 1137 } 1138 1139 if err := g.Wait(); err != nil { 1140 for _, r := range readers { 1141 if r != nil && *r != nil { 1142 (*r).Close() 1143 } 1144 } 1145 return err 1146 } 1147 1148 defer func() { 1149 for _, r := range readers { 1150 if r != nil && *r != nil { 1151 (*r).Close() 1152 } 1153 } 1154 }() 1155 1156 pull, err := models.PullFromRecord(did, rkey, record, readers) 1157 if err != nil { 1158 return fmt.Errorf("failed to parse pull from record: %w", err) 1159 } 1160 if err := i.Validator.ValidatePull(pull); err != nil { 1161 return fmt.Errorf("failed to validate pull: %w", err) 1162 } 1163 1164 tx, err := ddb.BeginTx(ctx, nil) 1165 if err != nil { 1166 l.Error("failed to begin transaction", "err", err) 1167 return err 1168 } 1169 defer tx.Rollback() 1170 1171 err = db.PutPull(tx, pull) 1172 if err != nil { 1173 l.Error("failed to create pull", "err", err) 1174 return err 1175 } 1176 1177 err = tx.Commit() 1178 if err != nil { 1179 l.Error("failed to commit txn", "err", err) 1180 return err 1181 } 1182 1183 return nil 1184 1185 case jmodels.CommitOperationDelete: 1186 tx, err := ddb.BeginTx(ctx, nil) 1187 if err != nil { 1188 l.Error("failed to begin transaction", "err", err) 1189 return err 1190 } 1191 defer tx.Rollback() 1192 1193 if err := db.AbandonPulls( 1194 tx, 1195 orm.FilterEq("owner_did", did), 1196 orm.FilterEq("rkey", rkey), 1197 ); err != nil { 1198 l.Error("failed to abandon", "err", err) 1199 return fmt.Errorf("failed to abandon pull record: %w", err) 1200 } 1201 if err := tx.Commit(); err != nil { 1202 l.Error("failed to commit txn", "err", err) 1203 return err 1204 } 1205 1206 return nil 1207 } 1208 1209 return nil 1210} 1211 1212func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 1213 did := e.Did 1214 rkey := e.Commit.RKey 1215 1216 var err error 1217 1218 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1219 l.Info("ingesting record") 1220 1221 ddb, ok := i.Db.Execer.(*db.DB) 1222 if !ok { 1223 return fmt.Errorf("failed to index issue comment record, invalid db cast") 1224 } 1225 1226 switch e.Commit.Operation { 1227 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1228 raw := json.RawMessage(e.Commit.Record) 1229 record := tangled.RepoIssueComment{} 1230 err = json.Unmarshal(raw, &record) 1231 if err != nil { 1232 return fmt.Errorf("invalid record: %w", err) 1233 } 1234 1235 comment, err := models.IssueCommentFromRecord(did, rkey, record) 1236 if err != nil { 1237 return fmt.Errorf("failed to parse comment from record: %w", err) 1238 } 1239 1240 if err := i.Validator.ValidateIssueComment(comment); err != nil { 1241 return fmt.Errorf("failed to validate comment: %w", err) 1242 } 1243 1244 tx, err := ddb.Begin() 1245 if err != nil { 1246 return fmt.Errorf("failed to start transaction: %w", err) 1247 } 1248 defer tx.Rollback() 1249 1250 _, err = db.AddIssueComment(tx, *comment) 1251 if err != nil { 1252 return fmt.Errorf("failed to create issue comment: %w", err) 1253 } 1254 1255 return tx.Commit() 1256 1257 case jmodels.CommitOperationDelete: 1258 if err := db.DeleteIssueComments( 1259 ddb, 1260 orm.FilterEq("did", did), 1261 orm.FilterEq("rkey", rkey), 1262 ); err != nil { 1263 return fmt.Errorf("failed to delete issue comment record: %w", err) 1264 } 1265 1266 return nil 1267 } 1268 1269 return nil 1270} 1271 1272func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 1273 did := e.Did 1274 rkey := e.Commit.RKey 1275 1276 var err error 1277 1278 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1279 l.Info("ingesting record") 1280 1281 ddb, ok := i.Db.Execer.(*db.DB) 1282 if !ok { 1283 return fmt.Errorf("failed to index label definition, invalid db cast") 1284 } 1285 1286 switch e.Commit.Operation { 1287 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1288 raw := json.RawMessage(e.Commit.Record) 1289 record := tangled.LabelDefinition{} 1290 err = json.Unmarshal(raw, &record) 1291 if err != nil { 1292 return fmt.Errorf("invalid record: %w", err) 1293 } 1294 1295 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1296 if err != nil { 1297 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1298 } 1299 1300 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1301 return fmt.Errorf("failed to validate labeldef: %w", err) 1302 } 1303 1304 _, err = db.AddLabelDefinition(ddb, def) 1305 if err != nil { 1306 return fmt.Errorf("failed to create labeldef: %w", err) 1307 } 1308 1309 return nil 1310 1311 case jmodels.CommitOperationDelete: 1312 if err := db.DeleteLabelDefinition( 1313 ddb, 1314 orm.FilterEq("did", did), 1315 orm.FilterEq("rkey", rkey), 1316 ); err != nil { 1317 return fmt.Errorf("failed to delete labeldef record: %w", err) 1318 } 1319 1320 return nil 1321 } 1322 1323 return nil 1324} 1325 1326func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1327 did := e.Did 1328 rkey := e.Commit.RKey 1329 1330 var err error 1331 1332 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1333 l.Info("ingesting record") 1334 1335 ddb, ok := i.Db.Execer.(*db.DB) 1336 if !ok { 1337 return fmt.Errorf("failed to index label op, invalid db cast") 1338 } 1339 1340 switch e.Commit.Operation { 1341 case jmodels.CommitOperationCreate: 1342 raw := json.RawMessage(e.Commit.Record) 1343 record := tangled.LabelOp{} 1344 err = json.Unmarshal(raw, &record) 1345 if err != nil { 1346 return fmt.Errorf("invalid record: %w", err) 1347 } 1348 1349 subject := syntax.ATURI(record.Subject) 1350 collection := subject.Collection() 1351 1352 var repo *models.Repo 1353 switch collection { 1354 case tangled.RepoIssueNSID: 1355 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1356 if err != nil || len(i) != 1 { 1357 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1358 } 1359 repo = i[0].Repo 1360 default: 1361 return fmt.Errorf("unsupported label subject: %s", collection) 1362 } 1363 1364 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1365 if err != nil { 1366 return fmt.Errorf("failed to build label application ctx: %w", err) 1367 } 1368 1369 ops := models.LabelOpsFromRecord(did, rkey, record) 1370 1371 for _, o := range ops { 1372 def, ok := actx.Defs[o.OperandKey] 1373 if !ok { 1374 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1375 } 1376 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1377 return fmt.Errorf("failed to validate labelop: %w", err) 1378 } 1379 } 1380 1381 tx, err := ddb.Begin() 1382 if err != nil { 1383 return err 1384 } 1385 defer tx.Rollback() 1386 1387 for _, o := range ops { 1388 _, err = db.AddLabelOp(tx, &o) 1389 if err != nil { 1390 return fmt.Errorf("failed to add labelop: %w", err) 1391 } 1392 } 1393 1394 if err = tx.Commit(); err != nil { 1395 return err 1396 } 1397 } 1398 1399 return nil 1400}