forked from
tangled.org/core
Monorepo for Tangled
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "net/http"
13 "net/url"
14 "slices"
15 "sync"
16
17 "time"
18
19 "github.com/avast/retry-go/v4"
20 "github.com/bluesky-social/indigo/atproto/syntax"
21 jmodels "github.com/bluesky-social/jetstream/pkg/models"
22 "github.com/go-git/go-git/v5/plumbing"
23 "github.com/ipfs/go-cid"
24 "golang.org/x/sync/errgroup"
25 "tangled.org/core/api/tangled"
26 "tangled.org/core/appview/config"
27 "tangled.org/core/appview/db"
28 "tangled.org/core/appview/models"
29 "tangled.org/core/appview/serververify"
30 "tangled.org/core/appview/validator"
31 "tangled.org/core/idresolver"
32 "tangled.org/core/orm"
33 "tangled.org/core/rbac"
34)
35
36type Ingester struct {
37 Db db.DbWrapper
38 Enforcer *rbac.Enforcer
39 IdResolver *idresolver.Resolver
40 Config *config.Config
41 Logger *slog.Logger
42 Validator *validator.Validator
43}
44
45type processFunc func(ctx context.Context, e *jmodels.Event) error
46
47func (i *Ingester) Ingest() processFunc {
48 return func(ctx context.Context, e *jmodels.Event) error {
49 var err error
50
51 l := i.Logger.With("kind", e.Kind)
52 switch e.Kind {
53 case jmodels.EventKindAccount:
54 // TODO: sync account state to db
55 if e.Account.Active {
56 break
57 }
58 // TODO: revoke sessions by DID
59 if *e.Account.Status == "deactivated" {
60 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
61 }
62 case jmodels.EventKindIdentity:
63 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
64 case jmodels.EventKindCommit:
65 switch e.Commit.Collection {
66 case tangled.GraphFollowNSID:
67 err = i.ingestFollow(e)
68 case tangled.FeedStarNSID:
69 err = i.ingestStar(e)
70 case tangled.PublicKeyNSID:
71 err = i.ingestPublicKey(e)
72 case tangled.RepoArtifactNSID:
73 err = i.ingestArtifact(e)
74 case tangled.ActorProfileNSID:
75 err = i.ingestProfile(ctx, e)
76 case tangled.SpindleMemberNSID:
77 err = i.ingestSpindleMember(ctx, e)
78 case tangled.SpindleNSID:
79 err = i.ingestSpindle(ctx, e)
80 case tangled.KnotMemberNSID:
81 err = i.ingestKnotMember(e)
82 case tangled.KnotNSID:
83 err = i.ingestKnot(e)
84 case tangled.StringNSID:
85 err = i.ingestString(e)
86 case tangled.RepoIssueNSID:
87 err = i.ingestIssue(ctx, e)
88 case tangled.RepoPullNSID:
89 err = i.ingestPull(ctx, e)
90 case tangled.RepoIssueCommentNSID:
91 err = i.ingestIssueComment(e)
92 case tangled.LabelDefinitionNSID:
93 err = i.ingestLabelDefinition(e)
94 case tangled.LabelOpNSID:
95 err = i.ingestLabelOp(e)
96 }
97 l = i.Logger.With("nsid", e.Commit.Collection)
98 }
99
100 if err != nil {
101 l.Warn("failed to ingest record, skipping", "err", err)
102 }
103
104 lastTimeUs := e.TimeUS + 1
105 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
106 l.Error("failed to save cursor", "err", saveErr)
107 }
108
109 return nil
110 }
111}
112
113func (i *Ingester) ingestStar(e *jmodels.Event) error {
114 var err error
115 did := e.Did
116
117 l := i.Logger.With("handler", "ingestStar")
118 l = l.With("nsid", e.Commit.Collection)
119
120 switch e.Commit.Operation {
121 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
122 var subjectUri syntax.ATURI
123
124 raw := json.RawMessage(e.Commit.Record)
125 record := tangled.FeedStar{}
126 err := json.Unmarshal(raw, &record)
127 if err != nil {
128 l.Error("invalid record", "err", err)
129 return err
130 }
131
132 star := &models.Star{
133 Did: did,
134 Rkey: e.Commit.RKey,
135 }
136
137 switch {
138 case record.SubjectDid != nil:
139 repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid))
140 if repoErr == nil {
141 subjectUri = repo.RepoAt()
142 star.RepoAt = subjectUri
143 }
144 case record.Subject != nil:
145 subjectUri, err = syntax.ParseATURI(*record.Subject)
146 if err != nil {
147 l.Error("invalid record", "err", err)
148 return err
149 }
150 star.RepoAt = subjectUri
151 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String())
152 if repoErr == nil && repo.RepoDid != "" {
153 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil {
154 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
155 }
156 }
157 default:
158 l.Error("star record has neither subject nor subjectDid")
159 return fmt.Errorf("star record has neither subject nor subjectDid")
160 }
161 err = db.AddStar(i.Db, star)
162 case jmodels.CommitOperationDelete:
163 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
164 }
165
166 if err != nil {
167 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
168 }
169
170 return nil
171}
172
173func (i *Ingester) ingestFollow(e *jmodels.Event) error {
174 var err error
175 did := e.Did
176
177 l := i.Logger.With("handler", "ingestFollow")
178 l = l.With("nsid", e.Commit.Collection)
179
180 switch e.Commit.Operation {
181 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
182 raw := json.RawMessage(e.Commit.Record)
183 record := tangled.GraphFollow{}
184 err = json.Unmarshal(raw, &record)
185 if err != nil {
186 l.Error("invalid record", "err", err)
187 return err
188 }
189
190 err = db.AddFollow(i.Db, &models.Follow{
191 UserDid: did,
192 SubjectDid: record.Subject,
193 Rkey: e.Commit.RKey,
194 })
195 case jmodels.CommitOperationDelete:
196 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
197 }
198
199 if err != nil {
200 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
201 }
202
203 return nil
204}
205
206func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
207 did := e.Did
208 var err error
209
210 l := i.Logger.With("handler", "ingestPublicKey")
211 l = l.With("nsid", e.Commit.Collection)
212
213 switch e.Commit.Operation {
214 case jmodels.CommitOperationCreate:
215 l.Debug("processing add of pubkey")
216 raw := json.RawMessage(e.Commit.Record)
217 record := tangled.PublicKey{}
218 err = json.Unmarshal(raw, &record)
219 if err != nil {
220 l.Error("invalid record", "err", err)
221 return err
222 }
223
224 name := record.Name
225 key := record.Key
226 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
227 case jmodels.CommitOperationUpdate:
228 l.Debug("processing update of pubkey")
229 raw := json.RawMessage(e.Commit.Record)
230 record := tangled.PublicKey{}
231 err = json.Unmarshal(raw, &record)
232 if err != nil {
233 l.Error("invalid record", "err", err)
234 return err
235 }
236
237 name := record.Name
238 key := record.Key
239 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey)
240 case jmodels.CommitOperationDelete:
241 l.Debug("processing delete of pubkey")
242 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
243 }
244
245 if err != nil {
246 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
247 }
248
249 return nil
250}
251
252func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
253 did := e.Did
254 var err error
255
256 l := i.Logger.With("handler", "ingestArtifact")
257 l = l.With("nsid", e.Commit.Collection)
258
259 switch e.Commit.Operation {
260 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
261 raw := json.RawMessage(e.Commit.Record)
262 record := tangled.RepoArtifact{}
263 err = json.Unmarshal(raw, &record)
264 if err != nil {
265 l.Error("invalid record", "err", err)
266 return err
267 }
268
269 var repo *models.Repo
270 if record.RepoDid != nil && *record.RepoDid != "" {
271 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
272 if err != nil && !errors.Is(err, sql.ErrNoRows) {
273 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
274 }
275 }
276 if repo == nil && record.Repo != nil {
277 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
278 if parseErr != nil {
279 return parseErr
280 }
281 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
282 if err != nil {
283 return err
284 }
285 }
286 if repo == nil {
287 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
288 }
289
290 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push")
291 if err != nil || !ok {
292 return err
293 }
294
295 repoDid := repo.RepoDid
296 if repoDid == "" && record.RepoDid != nil {
297 repoDid = *record.RepoDid
298 }
299 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
300 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil {
301 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
302 }
303 }
304
305 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
306 if err != nil {
307 createdAt = time.Now()
308 }
309
310 artifact := models.Artifact{
311 Did: did,
312 Rkey: e.Commit.RKey,
313 RepoAt: repo.RepoAt(),
314 Tag: plumbing.Hash(record.Tag),
315 CreatedAt: createdAt,
316 BlobCid: cid.Cid(record.Artifact.Ref),
317 Name: record.Name,
318 Size: uint64(record.Artifact.Size),
319 MimeType: record.Artifact.MimeType,
320 }
321
322 err = db.AddArtifact(i.Db, artifact)
323 case jmodels.CommitOperationDelete:
324 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
325 }
326
327 if err != nil {
328 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
329 }
330
331 return nil
332}
333
334func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error {
335 did := e.Did
336 var err error
337
338 l := i.Logger.With("handler", "ingestProfile")
339 l = l.With("nsid", e.Commit.Collection)
340
341 if e.Commit.RKey != "self" {
342 return fmt.Errorf("ingestProfile only ingests `self` record")
343 }
344
345 switch e.Commit.Operation {
346 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
347 raw := json.RawMessage(e.Commit.Record)
348 record := tangled.ActorProfile{}
349 err = json.Unmarshal(raw, &record)
350 if err != nil {
351 l.Error("invalid record", "err", err)
352 return err
353 }
354
355 avatar := ""
356 if record.Avatar != nil {
357 avatar = record.Avatar.Ref.String()
358 }
359
360 description := ""
361 if record.Description != nil {
362 description = *record.Description
363 }
364
365 includeBluesky := record.Bluesky
366
367 pronouns := ""
368 if record.Pronouns != nil {
369 pronouns = *record.Pronouns
370 }
371
372 location := ""
373 if record.Location != nil {
374 location = *record.Location
375 }
376
377 var links [5]string
378 for i, l := range record.Links {
379 if i < 5 {
380 links[i] = l
381 }
382 }
383
384 var stats [2]models.VanityStat
385 for i, s := range record.Stats {
386 if i < 2 {
387 stats[i].Kind = models.ParseVanityStatKind(s)
388 }
389 }
390
391 var pinned [6]string
392 for i, r := range record.PinnedRepositories {
393 if i < 6 {
394 pinned[i] = r
395 }
396 }
397
398 var preferredHandle syntax.Handle
399 if record.PreferredHandle != nil {
400 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
401 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
402 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
403 preferredHandle = h
404 }
405 }
406 }
407
408 profile := models.Profile{
409 Did: did,
410 Avatar: avatar,
411 Description: description,
412 IncludeBluesky: includeBluesky,
413 Location: location,
414 Links: links,
415 Stats: stats,
416 PinnedRepos: pinned,
417 Pronouns: pronouns,
418 PreferredHandle: preferredHandle,
419 }
420
421 ddb, ok := i.Db.Execer.(*db.DB)
422 if !ok {
423 return fmt.Errorf("failed to index profile record, invalid db cast")
424 }
425
426 tx, err := ddb.Begin()
427 if err != nil {
428 return fmt.Errorf("failed to start transaction")
429 }
430
431 err = db.ValidateProfile(tx, &profile)
432 if err != nil {
433 return fmt.Errorf("invalid profile record")
434 }
435
436 err = db.UpsertProfile(tx, &profile)
437 case jmodels.CommitOperationDelete:
438 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
439 }
440
441 if err != nil {
442 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
443 }
444
445 return nil
446}
447
448func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
449 did := e.Did
450 var err error
451
452 l := i.Logger.With("handler", "ingestSpindleMember")
453 l = l.With("nsid", e.Commit.Collection)
454
455 switch e.Commit.Operation {
456 case jmodels.CommitOperationCreate:
457 raw := json.RawMessage(e.Commit.Record)
458 record := tangled.SpindleMember{}
459 err = json.Unmarshal(raw, &record)
460 if err != nil {
461 l.Error("invalid record", "err", err)
462 return err
463 }
464
465 // only spindle owner can invite to spindles
466 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
467 if err != nil || !ok {
468 return fmt.Errorf("failed to enforce permissions: %w", err)
469 }
470
471 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
472 if err != nil {
473 return err
474 }
475
476 if memberId.Handle.IsInvalidHandle() {
477 return err
478 }
479
480 ddb, ok := i.Db.Execer.(*db.DB)
481 if !ok {
482 return fmt.Errorf("invalid db cast")
483 }
484
485 err = db.AddSpindleMember(ddb, models.SpindleMember{
486 Did: syntax.DID(did),
487 Rkey: e.Commit.RKey,
488 Instance: record.Instance,
489 Subject: memberId.DID,
490 })
491 if !ok {
492 return fmt.Errorf("failed to add to db: %w", err)
493 }
494
495 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
496 if err != nil {
497 return fmt.Errorf("failed to update ACLs: %w", err)
498 }
499
500 l.Info("added spindle member")
501 case jmodels.CommitOperationDelete:
502 rkey := e.Commit.RKey
503
504 ddb, ok := i.Db.Execer.(*db.DB)
505 if !ok {
506 return fmt.Errorf("failed to index profile record, invalid db cast")
507 }
508
509 // get record from db first
510 members, err := db.GetSpindleMembers(
511 ddb,
512 orm.FilterEq("did", did),
513 orm.FilterEq("rkey", rkey),
514 )
515 if err != nil || len(members) != 1 {
516 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
517 }
518 member := members[0]
519
520 tx, err := ddb.Begin()
521 if err != nil {
522 return fmt.Errorf("failed to start txn: %w", err)
523 }
524
525 // remove record by rkey && update enforcer
526 if err = db.RemoveSpindleMember(
527 tx,
528 orm.FilterEq("did", did),
529 orm.FilterEq("rkey", rkey),
530 ); err != nil {
531 return fmt.Errorf("failed to remove from db: %w", err)
532 }
533
534 // update enforcer
535 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
536 if err != nil {
537 return fmt.Errorf("failed to update ACLs: %w", err)
538 }
539
540 if err = tx.Commit(); err != nil {
541 return fmt.Errorf("failed to commit txn: %w", err)
542 }
543
544 if err = i.Enforcer.E.SavePolicy(); err != nil {
545 return fmt.Errorf("failed to save ACLs: %w", err)
546 }
547
548 l.Info("removed spindle member")
549 }
550
551 return nil
552}
553
554func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
555 did := e.Did
556 var err error
557
558 l := i.Logger.With("handler", "ingestSpindle")
559 l = l.With("nsid", e.Commit.Collection)
560
561 switch e.Commit.Operation {
562 case jmodels.CommitOperationCreate:
563 raw := json.RawMessage(e.Commit.Record)
564 record := tangled.Spindle{}
565 err = json.Unmarshal(raw, &record)
566 if err != nil {
567 l.Error("invalid record", "err", err)
568 return err
569 }
570
571 instance := e.Commit.RKey
572
573 ddb, ok := i.Db.Execer.(*db.DB)
574 if !ok {
575 return fmt.Errorf("failed to index profile record, invalid db cast")
576 }
577
578 err := db.AddSpindle(ddb, models.Spindle{
579 Owner: syntax.DID(did),
580 Instance: instance,
581 })
582 if err != nil {
583 l.Error("failed to add spindle to db", "err", err, "instance", instance)
584 return err
585 }
586
587 err = retry.Do(
588 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
589 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second),
590 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true),
591 )
592 if err != nil {
593 l.Error("failed to verify spindle after retries", "err", err, "instance", instance)
594 return err
595 }
596
597 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
598 if err != nil {
599 return fmt.Errorf("failed to mark verified: %w", err)
600 }
601
602 return nil
603
604 case jmodels.CommitOperationDelete:
605 instance := e.Commit.RKey
606
607 ddb, ok := i.Db.Execer.(*db.DB)
608 if !ok {
609 return fmt.Errorf("failed to index profile record, invalid db cast")
610 }
611
612 // get record from db first
613 spindles, err := db.GetSpindles(
614 ctx,
615 ddb,
616 orm.FilterEq("owner", did),
617 orm.FilterEq("instance", instance),
618 )
619 if err != nil || len(spindles) != 1 {
620 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
621 }
622 spindle := spindles[0]
623
624 tx, err := ddb.Begin()
625 if err != nil {
626 return err
627 }
628 defer func() {
629 tx.Rollback()
630 i.Enforcer.E.LoadPolicy()
631 }()
632
633 // remove spindle members first
634 err = db.RemoveSpindleMember(
635 tx,
636 orm.FilterEq("owner", did),
637 orm.FilterEq("instance", instance),
638 )
639 if err != nil {
640 return err
641 }
642
643 err = db.DeleteSpindle(
644 tx,
645 orm.FilterEq("owner", did),
646 orm.FilterEq("instance", instance),
647 )
648 if err != nil {
649 return err
650 }
651
652 if spindle.Verified != nil {
653 err = i.Enforcer.RemoveSpindle(instance)
654 if err != nil {
655 return err
656 }
657 }
658
659 err = tx.Commit()
660 if err != nil {
661 return err
662 }
663
664 err = i.Enforcer.E.SavePolicy()
665 if err != nil {
666 return err
667 }
668 }
669
670 return nil
671}
672
673func (i *Ingester) ingestString(e *jmodels.Event) error {
674 did := e.Did
675 rkey := e.Commit.RKey
676
677 var err error
678
679 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
680 l.Info("ingesting record")
681
682 ddb, ok := i.Db.Execer.(*db.DB)
683 if !ok {
684 return fmt.Errorf("failed to index string record, invalid db cast")
685 }
686
687 switch e.Commit.Operation {
688 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
689 raw := json.RawMessage(e.Commit.Record)
690 record := tangled.String{}
691 err = json.Unmarshal(raw, &record)
692 if err != nil {
693 l.Error("invalid record", "err", err)
694 return err
695 }
696
697 string := models.StringFromRecord(did, rkey, record)
698
699 if err = i.Validator.ValidateString(&string); err != nil {
700 l.Error("invalid record", "err", err)
701 return err
702 }
703
704 if err = db.AddString(ddb, string); err != nil {
705 l.Error("failed to add string", "err", err)
706 return err
707 }
708
709 return nil
710
711 case jmodels.CommitOperationDelete:
712 if err := db.DeleteString(
713 ddb,
714 orm.FilterEq("did", did),
715 orm.FilterEq("rkey", rkey),
716 ); err != nil {
717 l.Error("failed to delete", "err", err)
718 return fmt.Errorf("failed to delete string record: %w", err)
719 }
720
721 return nil
722 }
723
724 return nil
725}
726
727func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
728 did := e.Did
729 var err error
730
731 l := i.Logger.With("handler", "ingestKnotMember")
732 l = l.With("nsid", e.Commit.Collection)
733
734 switch e.Commit.Operation {
735 case jmodels.CommitOperationCreate:
736 raw := json.RawMessage(e.Commit.Record)
737 record := tangled.KnotMember{}
738 err = json.Unmarshal(raw, &record)
739 if err != nil {
740 l.Error("invalid record", "err", err)
741 return err
742 }
743
744 // only knot owner can invite to knots
745 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
746 if err != nil || !ok {
747 return fmt.Errorf("failed to enforce permissions: %w", err)
748 }
749
750 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
751 if err != nil {
752 return err
753 }
754
755 if memberId.Handle.IsInvalidHandle() {
756 return err
757 }
758
759 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
760 if err != nil {
761 return fmt.Errorf("failed to update ACLs: %w", err)
762 }
763
764 l.Info("added knot member")
765 case jmodels.CommitOperationDelete:
766 // we don't store knot members in a table (like we do for spindle)
767 // and we can't remove this just yet. possibly fixed if we switch
768 // to either:
769 // 1. a knot_members table like with spindle and store the rkey
770 // 2. use the knot host as the rkey
771 //
772 // TODO: implement member deletion
773 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
774 }
775
776 return nil
777}
778
779func (i *Ingester) ingestKnot(e *jmodels.Event) error {
780 did := e.Did
781 var err error
782
783 l := i.Logger.With("handler", "ingestKnot")
784 l = l.With("nsid", e.Commit.Collection)
785
786 switch e.Commit.Operation {
787 case jmodels.CommitOperationCreate:
788 raw := json.RawMessage(e.Commit.Record)
789 record := tangled.Knot{}
790 err = json.Unmarshal(raw, &record)
791 if err != nil {
792 l.Error("invalid record", "err", err)
793 return err
794 }
795
796 domain := e.Commit.RKey
797
798 ddb, ok := i.Db.Execer.(*db.DB)
799 if !ok {
800 return fmt.Errorf("failed to index profile record, invalid db cast")
801 }
802
803 err := db.AddKnot(ddb, domain, did)
804 if err != nil {
805 l.Error("failed to add knot to db", "err", err, "domain", domain)
806 return err
807 }
808
809 err = retry.Do(
810 func() error {
811 return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
812 },
813 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second),
814 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true),
815 )
816 if err != nil {
817 l.Error("failed to verify knot after retries", "err", err, "domain", domain)
818 return err
819 }
820
821 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
822 if err != nil {
823 return fmt.Errorf("failed to mark verified: %w", err)
824 }
825
826 return nil
827
828 case jmodels.CommitOperationDelete:
829 domain := e.Commit.RKey
830
831 ddb, ok := i.Db.Execer.(*db.DB)
832 if !ok {
833 return fmt.Errorf("failed to index knot record, invalid db cast")
834 }
835
836 // get record from db first
837 registrations, err := db.GetRegistrations(
838 ddb,
839 orm.FilterEq("domain", domain),
840 orm.FilterEq("did", did),
841 )
842 if err != nil {
843 return fmt.Errorf("failed to get registration: %w", err)
844 }
845 if len(registrations) != 1 {
846 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations))
847 }
848 registration := registrations[0]
849
850 tx, err := ddb.Begin()
851 if err != nil {
852 return err
853 }
854 defer func() {
855 tx.Rollback()
856 i.Enforcer.E.LoadPolicy()
857 }()
858
859 err = db.DeleteKnot(
860 tx,
861 orm.FilterEq("did", did),
862 orm.FilterEq("domain", domain),
863 )
864 if err != nil {
865 return err
866 }
867
868 if registration.Registered != nil {
869 err = i.Enforcer.RemoveKnot(domain)
870 if err != nil {
871 return err
872 }
873 }
874
875 err = tx.Commit()
876 if err != nil {
877 return err
878 }
879
880 err = i.Enforcer.E.SavePolicy()
881 if err != nil {
882 return err
883 }
884 }
885
886 return nil
887}
888func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
889 did := e.Did
890 rkey := e.Commit.RKey
891
892 var err error
893
894 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
895 l.Info("ingesting record")
896
897 ddb, ok := i.Db.Execer.(*db.DB)
898 if !ok {
899 return fmt.Errorf("failed to index issue record, invalid db cast")
900 }
901
902 switch e.Commit.Operation {
903 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
904 raw := json.RawMessage(e.Commit.Record)
905 record := tangled.RepoIssue{}
906 err = json.Unmarshal(raw, &record)
907 if err != nil {
908 l.Error("invalid record", "err", err)
909 return err
910 }
911
912 issue := models.IssueFromRecord(did, rkey, record)
913
914 if issue.RepoAt == "" {
915 return fmt.Errorf("issue record has no repo field")
916 }
917
918 if err := i.Validator.ValidateIssue(&issue); err != nil {
919 return fmt.Errorf("failed to validate issue: %w", err)
920 }
921
922 if record.Repo != nil {
923 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo)
924 if repoErr == nil && repo.RepoDid != "" {
925 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil {
926 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
927 }
928 }
929 }
930
931 tx, err := ddb.BeginTx(ctx, nil)
932 if err != nil {
933 l.Error("failed to begin transaction", "err", err)
934 return err
935 }
936 defer tx.Rollback()
937
938 err = db.PutIssue(tx, &issue)
939 if err != nil {
940 l.Error("failed to create issue", "err", err)
941 return err
942 }
943
944 err = tx.Commit()
945 if err != nil {
946 l.Error("failed to commit txn", "err", err)
947 return err
948 }
949
950 return nil
951
952 case jmodels.CommitOperationDelete:
953 tx, err := ddb.BeginTx(ctx, nil)
954 if err != nil {
955 l.Error("failed to begin transaction", "err", err)
956 return err
957 }
958 defer tx.Rollback()
959
960 if err := db.DeleteIssues(
961 tx,
962 did,
963 rkey,
964 ); err != nil {
965 l.Error("failed to delete", "err", err)
966 return fmt.Errorf("failed to delete issue record: %w", err)
967 }
968 if err := tx.Commit(); err != nil {
969 l.Error("failed to commit txn", "err", err)
970 return err
971 }
972
973 return nil
974 }
975
976 return nil
977}
978
979func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error {
980 did := e.Did
981 rkey := e.Commit.RKey
982
983 var err error
984
985 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
986 l.Info("ingesting record")
987
988 ddb, ok := i.Db.Execer.(*db.DB)
989 if !ok {
990 return fmt.Errorf("failed to index pull record, invalid db cast")
991 }
992
993 switch e.Commit.Operation {
994 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
995 raw := json.RawMessage(e.Commit.Record)
996 record := tangled.RepoPull{}
997 err = json.Unmarshal(raw, &record)
998 if err != nil {
999 l.Error("invalid record", "err", err)
1000 return err
1001 }
1002
1003 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
1004 if err != nil {
1005 l.Error("failed to resolve did")
1006 return err
1007 }
1008
1009 // go through and fetch all blobs in parallel
1010 readers := make([]*io.ReadCloser, len(record.Rounds))
1011 var mu sync.Mutex
1012
1013 g, gctx := errgroup.WithContext(ctx)
1014
1015 for idx, b := range record.Rounds {
1016 g.Go(func() error {
1017 // for some reason, a blob is empty
1018 if b.PatchBlob == nil {
1019 return fmt.Errorf("missing patchBlob in round %d", idx)
1020 }
1021
1022 ownerPds := ownerId.PDSEndpoint()
1023 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
1024 q := url.Query()
1025 q.Set("cid", b.PatchBlob.Ref.String())
1026 q.Set("did", did)
1027 url.RawQuery = q.Encode()
1028
1029 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
1030 if err != nil {
1031 l.Error("failed to create request")
1032 return err
1033 }
1034 req.Header.Set("Content-Type", "application/json")
1035
1036 resp, err := http.DefaultClient.Do(req)
1037 if err != nil {
1038 l.Error("failed to make request")
1039 return err
1040 }
1041
1042 mu.Lock()
1043 readers[idx] = &resp.Body
1044 mu.Unlock()
1045
1046 return nil
1047 })
1048 }
1049
1050 if err := g.Wait(); err != nil {
1051 for _, r := range readers {
1052 if r != nil && *r != nil {
1053 (*r).Close()
1054 }
1055 }
1056 return err
1057 }
1058
1059 defer func() {
1060 for _, r := range readers {
1061 if r != nil && *r != nil {
1062 (*r).Close()
1063 }
1064 }
1065 }()
1066
1067 pull, err := models.PullFromRecord(did, rkey, record, readers)
1068 if err != nil {
1069 return fmt.Errorf("failed to parse pull from record: %w", err)
1070 }
1071 if err := i.Validator.ValidatePull(pull); err != nil {
1072 return fmt.Errorf("failed to validate pull: %w", err)
1073 }
1074
1075 tx, err := ddb.BeginTx(ctx, nil)
1076 if err != nil {
1077 l.Error("failed to begin transaction", "err", err)
1078 return err
1079 }
1080 defer tx.Rollback()
1081
1082 err = db.PutPull(tx, pull)
1083 if err != nil {
1084 l.Error("failed to create pull", "err", err)
1085 return err
1086 }
1087
1088 err = tx.Commit()
1089 if err != nil {
1090 l.Error("failed to commit txn", "err", err)
1091 return err
1092 }
1093
1094 return nil
1095
1096 case jmodels.CommitOperationDelete:
1097 tx, err := ddb.BeginTx(ctx, nil)
1098 if err != nil {
1099 l.Error("failed to begin transaction", "err", err)
1100 return err
1101 }
1102 defer tx.Rollback()
1103
1104 if err := db.AbandonPulls(
1105 tx,
1106 orm.FilterEq("owner_did", did),
1107 orm.FilterEq("rkey", rkey),
1108 ); err != nil {
1109 l.Error("failed to abandon", "err", err)
1110 return fmt.Errorf("failed to abandon pull record: %w", err)
1111 }
1112 if err := tx.Commit(); err != nil {
1113 l.Error("failed to commit txn", "err", err)
1114 return err
1115 }
1116
1117 return nil
1118 }
1119
1120 return nil
1121}
1122
1123func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
1124 did := e.Did
1125 rkey := e.Commit.RKey
1126
1127 var err error
1128
1129 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1130 l.Info("ingesting record")
1131
1132 ddb, ok := i.Db.Execer.(*db.DB)
1133 if !ok {
1134 return fmt.Errorf("failed to index issue comment record, invalid db cast")
1135 }
1136
1137 switch e.Commit.Operation {
1138 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1139 raw := json.RawMessage(e.Commit.Record)
1140 record := tangled.RepoIssueComment{}
1141 err = json.Unmarshal(raw, &record)
1142 if err != nil {
1143 return fmt.Errorf("invalid record: %w", err)
1144 }
1145
1146 comment, err := models.IssueCommentFromRecord(did, rkey, record)
1147 if err != nil {
1148 return fmt.Errorf("failed to parse comment from record: %w", err)
1149 }
1150
1151 if err := i.Validator.ValidateIssueComment(comment); err != nil {
1152 return fmt.Errorf("failed to validate comment: %w", err)
1153 }
1154
1155 tx, err := ddb.Begin()
1156 if err != nil {
1157 return fmt.Errorf("failed to start transaction: %w", err)
1158 }
1159 defer tx.Rollback()
1160
1161 _, err = db.AddIssueComment(tx, *comment)
1162 if err != nil {
1163 return fmt.Errorf("failed to create issue comment: %w", err)
1164 }
1165
1166 return tx.Commit()
1167
1168 case jmodels.CommitOperationDelete:
1169 if err := db.DeleteIssueComments(
1170 ddb,
1171 orm.FilterEq("did", did),
1172 orm.FilterEq("rkey", rkey),
1173 ); err != nil {
1174 return fmt.Errorf("failed to delete issue comment record: %w", err)
1175 }
1176
1177 return nil
1178 }
1179
1180 return nil
1181}
1182
1183func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
1184 did := e.Did
1185 rkey := e.Commit.RKey
1186
1187 var err error
1188
1189 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1190 l.Info("ingesting record")
1191
1192 ddb, ok := i.Db.Execer.(*db.DB)
1193 if !ok {
1194 return fmt.Errorf("failed to index label definition, invalid db cast")
1195 }
1196
1197 switch e.Commit.Operation {
1198 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1199 raw := json.RawMessage(e.Commit.Record)
1200 record := tangled.LabelDefinition{}
1201 err = json.Unmarshal(raw, &record)
1202 if err != nil {
1203 return fmt.Errorf("invalid record: %w", err)
1204 }
1205
1206 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1207 if err != nil {
1208 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1209 }
1210
1211 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1212 return fmt.Errorf("failed to validate labeldef: %w", err)
1213 }
1214
1215 _, err = db.AddLabelDefinition(ddb, def)
1216 if err != nil {
1217 return fmt.Errorf("failed to create labeldef: %w", err)
1218 }
1219
1220 return nil
1221
1222 case jmodels.CommitOperationDelete:
1223 if err := db.DeleteLabelDefinition(
1224 ddb,
1225 orm.FilterEq("did", did),
1226 orm.FilterEq("rkey", rkey),
1227 ); err != nil {
1228 return fmt.Errorf("failed to delete labeldef record: %w", err)
1229 }
1230
1231 return nil
1232 }
1233
1234 return nil
1235}
1236
1237func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1238 did := e.Did
1239 rkey := e.Commit.RKey
1240
1241 var err error
1242
1243 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1244 l.Info("ingesting record")
1245
1246 ddb, ok := i.Db.Execer.(*db.DB)
1247 if !ok {
1248 return fmt.Errorf("failed to index label op, invalid db cast")
1249 }
1250
1251 switch e.Commit.Operation {
1252 case jmodels.CommitOperationCreate:
1253 raw := json.RawMessage(e.Commit.Record)
1254 record := tangled.LabelOp{}
1255 err = json.Unmarshal(raw, &record)
1256 if err != nil {
1257 return fmt.Errorf("invalid record: %w", err)
1258 }
1259
1260 subject := syntax.ATURI(record.Subject)
1261 collection := subject.Collection()
1262
1263 var repo *models.Repo
1264 switch collection {
1265 case tangled.RepoIssueNSID:
1266 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1267 if err != nil || len(i) != 1 {
1268 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1269 }
1270 repo = i[0].Repo
1271 default:
1272 return fmt.Errorf("unsupported label subject: %s", collection)
1273 }
1274
1275 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1276 if err != nil {
1277 return fmt.Errorf("failed to build label application ctx: %w", err)
1278 }
1279
1280 ops := models.LabelOpsFromRecord(did, rkey, record)
1281
1282 for _, o := range ops {
1283 def, ok := actx.Defs[o.OperandKey]
1284 if !ok {
1285 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1286 }
1287 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1288 return fmt.Errorf("failed to validate labelop: %w", err)
1289 }
1290 }
1291
1292 tx, err := ddb.Begin()
1293 if err != nil {
1294 return err
1295 }
1296 defer tx.Rollback()
1297
1298 for _, o := range ops {
1299 _, err = db.AddLabelOp(tx, &o)
1300 if err != nil {
1301 return fmt.Errorf("failed to add labelop: %w", err)
1302 }
1303 }
1304
1305 if err = tx.Commit(); err != nil {
1306 return err
1307 }
1308 }
1309
1310 return nil
1311}