forked from
tangled.org/core
Monorepo for Tangled
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "net/http"
13 "net/url"
14 "slices"
15 "sync"
16
17 "time"
18
19 "github.com/avast/retry-go/v4"
20 "github.com/bluesky-social/indigo/atproto/syntax"
21 jmodels "github.com/bluesky-social/jetstream/pkg/models"
22 "github.com/go-git/go-git/v5/plumbing"
23 "github.com/ipfs/go-cid"
24 "golang.org/x/sync/errgroup"
25 "tangled.org/core/api/tangled"
26 "tangled.org/core/appview/cache"
27 "tangled.org/core/appview/config"
28 "tangled.org/core/appview/db"
29 "tangled.org/core/appview/models"
30 "tangled.org/core/appview/serververify"
31 "tangled.org/core/appview/validator"
32 "tangled.org/core/idresolver"
33 "tangled.org/core/orm"
34 "tangled.org/core/rbac"
35)
36
37type Ingester struct {
38 Db db.DbWrapper
39 Enforcer *rbac.Enforcer
40 IdResolver *idresolver.Resolver
41 Cache *cache.Cache
42 Config *config.Config
43 Logger *slog.Logger
44 Validator *validator.Validator
45}
46
47type processFunc func(ctx context.Context, e *jmodels.Event) error
48
49func (i *Ingester) Ingest() processFunc {
50 return func(ctx context.Context, e *jmodels.Event) error {
51 var err error
52
53 l := i.Logger.With("kind", e.Kind)
54 switch e.Kind {
55 case jmodels.EventKindAccount:
56 // TODO: sync account state to db
57 if e.Account.Active {
58 break
59 }
60 // TODO: revoke sessions by DID
61 if *e.Account.Status == "deactivated" {
62 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
63 }
64 case jmodels.EventKindIdentity:
65 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
66 case jmodels.EventKindCommit:
67 switch e.Commit.Collection {
68 case tangled.GraphFollowNSID:
69 err = i.ingestFollow(e)
70 case tangled.GraphVouchNSID:
71 err = i.ingestVouch(ctx, e)
72 case tangled.FeedStarNSID:
73 err = i.ingestStar(e)
74 case tangled.PublicKeyNSID:
75 err = i.ingestPublicKey(e)
76 case tangled.RepoArtifactNSID:
77 err = i.ingestArtifact(e)
78 case tangled.ActorProfileNSID:
79 err = i.ingestProfile(ctx, e)
80 case tangled.SpindleMemberNSID:
81 err = i.ingestSpindleMember(ctx, e)
82 case tangled.SpindleNSID:
83 err = i.ingestSpindle(ctx, e)
84 case tangled.KnotMemberNSID:
85 err = i.ingestKnotMember(e)
86 case tangled.KnotNSID:
87 err = i.ingestKnot(e)
88 case tangled.StringNSID:
89 err = i.ingestString(e)
90 case tangled.RepoIssueNSID:
91 err = i.ingestIssue(ctx, e)
92 case tangled.RepoPullNSID:
93 err = i.ingestPull(ctx, e)
94 case tangled.RepoIssueCommentNSID:
95 err = i.ingestIssueComment(e)
96 case tangled.LabelDefinitionNSID:
97 err = i.ingestLabelDefinition(e)
98 case tangled.LabelOpNSID:
99 err = i.ingestLabelOp(e)
100 }
101 l = i.Logger.With("nsid", e.Commit.Collection)
102 }
103
104 if err != nil {
105 l.Warn("failed to ingest record, skipping", "err", err)
106 }
107
108 lastTimeUs := e.TimeUS + 1
109 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
110 l.Error("failed to save cursor", "err", saveErr)
111 }
112
113 return nil
114 }
115}
116
117func (i *Ingester) ingestStar(e *jmodels.Event) error {
118 var err error
119 did := e.Did
120
121 l := i.Logger.With("handler", "ingestStar")
122 l = l.With("nsid", e.Commit.Collection)
123
124 switch e.Commit.Operation {
125 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
126 var subjectUri syntax.ATURI
127
128 raw := json.RawMessage(e.Commit.Record)
129 record := tangled.FeedStar{}
130 err := json.Unmarshal(raw, &record)
131 if err != nil {
132 l.Error("invalid record", "err", err)
133 return err
134 }
135
136 star := &models.Star{
137 Did: did,
138 Rkey: e.Commit.RKey,
139 }
140
141 switch {
142 case record.SubjectDid != nil:
143 repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid))
144 if repoErr == nil {
145 subjectUri = repo.RepoAt()
146 star.RepoAt = subjectUri
147 }
148 case record.Subject != nil:
149 subjectUri, err = syntax.ParseATURI(*record.Subject)
150 if err != nil {
151 l.Error("invalid record", "err", err)
152 return err
153 }
154 star.RepoAt = subjectUri
155 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String())
156 if repoErr == nil && repo.RepoDid != "" {
157 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil {
158 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
159 }
160 }
161 default:
162 l.Error("star record has neither subject nor subjectDid")
163 return fmt.Errorf("star record has neither subject nor subjectDid")
164 }
165 err = db.AddStar(i.Db, star)
166 case jmodels.CommitOperationDelete:
167 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
168 }
169
170 if err != nil {
171 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
172 }
173
174 return nil
175}
176
177func (i *Ingester) ingestFollow(e *jmodels.Event) error {
178 var err error
179 did := e.Did
180
181 l := i.Logger.With("handler", "ingestFollow")
182 l = l.With("nsid", e.Commit.Collection)
183
184 switch e.Commit.Operation {
185 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
186 raw := json.RawMessage(e.Commit.Record)
187 record := tangled.GraphFollow{}
188 err = json.Unmarshal(raw, &record)
189 if err != nil {
190 l.Error("invalid record", "err", err)
191 return err
192 }
193
194 err = db.AddFollow(i.Db, &models.Follow{
195 UserDid: did,
196 SubjectDid: record.Subject,
197 Rkey: e.Commit.RKey,
198 })
199 case jmodels.CommitOperationDelete:
200 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
201 }
202
203 if err != nil {
204 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
205 }
206
207 return nil
208}
209
210func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event) error {
211 var err error
212 did := e.Did
213
214 l := i.Logger.With("handler", "ingestVouch")
215 l = l.With("nsid", e.Commit.Collection)
216 l.Info("ingesting vouch")
217
218 switch e.Commit.Operation {
219 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
220 raw := json.RawMessage(e.Commit.Record)
221 record := tangled.GraphVouch{}
222 err = json.Unmarshal(raw, &record)
223 if err != nil {
224 l.Error("invalid record", "err", err)
225 return err
226 }
227
228 // rkey is the subject_did being vouched for/denounced
229 subjectDID := e.Commit.RKey
230
231 _, err = syntax.ParseDID(subjectDID)
232 if err != nil {
233 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID)
234 return fmt.Errorf("invalid subject_did: %w", err)
235 }
236
237 if did == subjectDID {
238 l.Warn("attempted self-vouch", "did", did)
239 return fmt.Errorf("cannot vouch for self")
240 }
241
242 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID)
243 if err != nil {
244 return err
245 }
246
247 if subjectId.Handle.IsInvalidHandle() {
248 return err
249 }
250
251 kind, err := models.ParseVouchKind(record.Kind)
252 if err != nil {
253 l.Error("invalid kind", "kind", kind)
254 return fmt.Errorf("invalid kind: %s", kind)
255 }
256
257 recordCid, err := cid.Parse(e.Commit.CID)
258 if err != nil {
259 l.Error("invalid cid", "err", err, "cid", e.Commit.CID)
260 return fmt.Errorf("invalid cid: %w", err)
261 }
262
263 err = db.AddVouch(i.Db, &models.Vouch{
264 Did: syntax.DID(did),
265 SubjectDid: subjectId.DID,
266 Cid: recordCid,
267 Kind: kind,
268 Reason: record.Reason,
269 })
270
271 case jmodels.CommitOperationDelete:
272 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey)
273 }
274
275 if err != nil {
276 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err)
277 }
278
279 return nil
280}
281
282func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
283 did := e.Did
284 var err error
285
286 l := i.Logger.With("handler", "ingestPublicKey")
287 l = l.With("nsid", e.Commit.Collection)
288
289 switch e.Commit.Operation {
290 case jmodels.CommitOperationCreate:
291 l.Debug("processing add of pubkey")
292 raw := json.RawMessage(e.Commit.Record)
293 record := tangled.PublicKey{}
294 err = json.Unmarshal(raw, &record)
295 if err != nil {
296 l.Error("invalid record", "err", err)
297 return err
298 }
299
300 name := record.Name
301 key := record.Key
302 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
303 case jmodels.CommitOperationUpdate:
304 l.Debug("processing update of pubkey")
305 raw := json.RawMessage(e.Commit.Record)
306 record := tangled.PublicKey{}
307 err = json.Unmarshal(raw, &record)
308 if err != nil {
309 l.Error("invalid record", "err", err)
310 return err
311 }
312
313 name := record.Name
314 key := record.Key
315 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey)
316 case jmodels.CommitOperationDelete:
317 l.Debug("processing delete of pubkey")
318 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
319 }
320
321 if err != nil {
322 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
323 }
324
325 return nil
326}
327
328func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
329 did := e.Did
330 var err error
331
332 l := i.Logger.With("handler", "ingestArtifact")
333 l = l.With("nsid", e.Commit.Collection)
334
335 switch e.Commit.Operation {
336 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
337 raw := json.RawMessage(e.Commit.Record)
338 record := tangled.RepoArtifact{}
339 err = json.Unmarshal(raw, &record)
340 if err != nil {
341 l.Error("invalid record", "err", err)
342 return err
343 }
344
345 var repo *models.Repo
346 if record.RepoDid != nil && *record.RepoDid != "" {
347 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
348 if err != nil && !errors.Is(err, sql.ErrNoRows) {
349 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
350 }
351 }
352 if repo == nil && record.Repo != nil {
353 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
354 if parseErr != nil {
355 return parseErr
356 }
357 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
358 if err != nil {
359 return err
360 }
361 }
362 if repo == nil {
363 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
364 }
365
366 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push")
367 if err != nil || !ok {
368 return err
369 }
370
371 repoDid := repo.RepoDid
372 if repoDid == "" && record.RepoDid != nil {
373 repoDid = *record.RepoDid
374 }
375 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
376 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil {
377 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
378 }
379 }
380
381 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
382 if err != nil {
383 createdAt = time.Now()
384 }
385
386 artifact := models.Artifact{
387 Did: did,
388 Rkey: e.Commit.RKey,
389 RepoAt: repo.RepoAt(),
390 Tag: plumbing.Hash(record.Tag),
391 CreatedAt: createdAt,
392 BlobCid: cid.Cid(record.Artifact.Ref),
393 Name: record.Name,
394 Size: uint64(record.Artifact.Size),
395 MimeType: record.Artifact.MimeType,
396 }
397
398 err = db.AddArtifact(i.Db, artifact)
399 case jmodels.CommitOperationDelete:
400 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
401 }
402
403 if err != nil {
404 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
405 }
406
407 return nil
408}
409
410func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error {
411 did := e.Did
412 var err error
413
414 l := i.Logger.With("handler", "ingestProfile")
415 l = l.With("nsid", e.Commit.Collection)
416
417 if e.Commit.RKey != "self" {
418 return fmt.Errorf("ingestProfile only ingests `self` record")
419 }
420
421 switch e.Commit.Operation {
422 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
423 raw := json.RawMessage(e.Commit.Record)
424 record := tangled.ActorProfile{}
425 err = json.Unmarshal(raw, &record)
426 if err != nil {
427 l.Error("invalid record", "err", err)
428 return err
429 }
430
431 avatar := ""
432 if record.Avatar != nil {
433 avatar = record.Avatar.Ref.String()
434 }
435
436 description := ""
437 if record.Description != nil {
438 description = *record.Description
439 }
440
441 includeBluesky := record.Bluesky
442
443 pronouns := ""
444 if record.Pronouns != nil {
445 pronouns = *record.Pronouns
446 }
447
448 location := ""
449 if record.Location != nil {
450 location = *record.Location
451 }
452
453 var links [5]string
454 for i, l := range record.Links {
455 if i < 5 {
456 links[i] = l
457 }
458 }
459
460 var stats [2]models.VanityStat
461 for i, s := range record.Stats {
462 if i < 2 {
463 stats[i].Kind = models.ParseVanityStatKind(s)
464 }
465 }
466
467 var pinned [6]string
468 for i, r := range record.PinnedRepositories {
469 if i < 6 {
470 pinned[i] = r
471 }
472 }
473
474 var preferredHandle syntax.Handle
475 if record.PreferredHandle != nil {
476 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
477 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
478 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
479 preferredHandle = h
480 }
481 }
482 }
483
484 profile := models.Profile{
485 Did: did,
486 Avatar: avatar,
487 Description: description,
488 IncludeBluesky: includeBluesky,
489 Location: location,
490 Links: links,
491 Stats: stats,
492 PinnedRepos: pinned,
493 Pronouns: pronouns,
494 PreferredHandle: preferredHandle,
495 }
496
497 ddb, ok := i.Db.Execer.(*db.DB)
498 if !ok {
499 return fmt.Errorf("failed to index profile record, invalid db cast")
500 }
501
502 tx, err := ddb.Begin()
503 if err != nil {
504 return fmt.Errorf("failed to start transaction")
505 }
506
507 err = db.ValidateProfile(tx, &profile)
508 if err != nil {
509 return fmt.Errorf("invalid profile record")
510 }
511
512 err = db.UpsertProfile(tx, &profile)
513 if err == nil && i.Cache != nil {
514 pipe := i.Cache.Pipeline()
515 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did)
516 if preferredHandle != "" {
517 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL)
518 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL)
519 } else {
520 pipe.Del(ctx, didKey)
521 }
522 if _, execErr := pipe.Exec(ctx); execErr != nil {
523 l.Warn("failed to update preferred handle cache", "err", execErr)
524 }
525 }
526 case jmodels.CommitOperationDelete:
527 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
528 }
529
530 if err != nil {
531 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
532 }
533
534 return nil
535}
536
537func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
538 did := e.Did
539 var err error
540
541 l := i.Logger.With("handler", "ingestSpindleMember")
542 l = l.With("nsid", e.Commit.Collection)
543
544 switch e.Commit.Operation {
545 case jmodels.CommitOperationCreate:
546 raw := json.RawMessage(e.Commit.Record)
547 record := tangled.SpindleMember{}
548 err = json.Unmarshal(raw, &record)
549 if err != nil {
550 l.Error("invalid record", "err", err)
551 return err
552 }
553
554 // only spindle owner can invite to spindles
555 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
556 if err != nil || !ok {
557 return fmt.Errorf("failed to enforce permissions: %w", err)
558 }
559
560 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
561 if err != nil {
562 return err
563 }
564
565 if memberId.Handle.IsInvalidHandle() {
566 return err
567 }
568
569 ddb, ok := i.Db.Execer.(*db.DB)
570 if !ok {
571 return fmt.Errorf("invalid db cast")
572 }
573
574 err = db.AddSpindleMember(ddb, models.SpindleMember{
575 Did: syntax.DID(did),
576 Rkey: e.Commit.RKey,
577 Instance: record.Instance,
578 Subject: memberId.DID,
579 })
580 if !ok {
581 return fmt.Errorf("failed to add to db: %w", err)
582 }
583
584 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
585 if err != nil {
586 return fmt.Errorf("failed to update ACLs: %w", err)
587 }
588
589 l.Info("added spindle member")
590 case jmodels.CommitOperationDelete:
591 rkey := e.Commit.RKey
592
593 ddb, ok := i.Db.Execer.(*db.DB)
594 if !ok {
595 return fmt.Errorf("failed to index profile record, invalid db cast")
596 }
597
598 // get record from db first
599 members, err := db.GetSpindleMembers(
600 ddb,
601 orm.FilterEq("did", did),
602 orm.FilterEq("rkey", rkey),
603 )
604 if err != nil || len(members) != 1 {
605 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
606 }
607 member := members[0]
608
609 tx, err := ddb.Begin()
610 if err != nil {
611 return fmt.Errorf("failed to start txn: %w", err)
612 }
613
614 // remove record by rkey && update enforcer
615 if err = db.RemoveSpindleMember(
616 tx,
617 orm.FilterEq("did", did),
618 orm.FilterEq("rkey", rkey),
619 ); err != nil {
620 return fmt.Errorf("failed to remove from db: %w", err)
621 }
622
623 // update enforcer
624 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
625 if err != nil {
626 return fmt.Errorf("failed to update ACLs: %w", err)
627 }
628
629 if err = tx.Commit(); err != nil {
630 return fmt.Errorf("failed to commit txn: %w", err)
631 }
632
633 if err = i.Enforcer.E.SavePolicy(); err != nil {
634 return fmt.Errorf("failed to save ACLs: %w", err)
635 }
636
637 l.Info("removed spindle member")
638 }
639
640 return nil
641}
642
643func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
644 did := e.Did
645 var err error
646
647 l := i.Logger.With("handler", "ingestSpindle")
648 l = l.With("nsid", e.Commit.Collection)
649
650 switch e.Commit.Operation {
651 case jmodels.CommitOperationCreate:
652 raw := json.RawMessage(e.Commit.Record)
653 record := tangled.Spindle{}
654 err = json.Unmarshal(raw, &record)
655 if err != nil {
656 l.Error("invalid record", "err", err)
657 return err
658 }
659
660 instance := e.Commit.RKey
661
662 ddb, ok := i.Db.Execer.(*db.DB)
663 if !ok {
664 return fmt.Errorf("failed to index profile record, invalid db cast")
665 }
666
667 err := db.AddSpindle(ddb, models.Spindle{
668 Owner: syntax.DID(did),
669 Instance: instance,
670 })
671 if err != nil {
672 l.Error("failed to add spindle to db", "err", err, "instance", instance)
673 return err
674 }
675
676 err = retry.Do(
677 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
678 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second),
679 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true),
680 )
681 if err != nil {
682 l.Error("failed to verify spindle after retries", "err", err, "instance", instance)
683 return err
684 }
685
686 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
687 if err != nil {
688 return fmt.Errorf("failed to mark verified: %w", err)
689 }
690
691 return nil
692
693 case jmodels.CommitOperationDelete:
694 instance := e.Commit.RKey
695
696 ddb, ok := i.Db.Execer.(*db.DB)
697 if !ok {
698 return fmt.Errorf("failed to index profile record, invalid db cast")
699 }
700
701 // get record from db first
702 spindles, err := db.GetSpindles(
703 ctx,
704 ddb,
705 orm.FilterEq("owner", did),
706 orm.FilterEq("instance", instance),
707 )
708 if err != nil || len(spindles) != 1 {
709 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
710 }
711 spindle := spindles[0]
712
713 tx, err := ddb.Begin()
714 if err != nil {
715 return err
716 }
717 defer func() {
718 tx.Rollback()
719 i.Enforcer.E.LoadPolicy()
720 }()
721
722 // remove spindle members first
723 err = db.RemoveSpindleMember(
724 tx,
725 orm.FilterEq("owner", did),
726 orm.FilterEq("instance", instance),
727 )
728 if err != nil {
729 return err
730 }
731
732 err = db.DeleteSpindle(
733 tx,
734 orm.FilterEq("owner", did),
735 orm.FilterEq("instance", instance),
736 )
737 if err != nil {
738 return err
739 }
740
741 if spindle.Verified != nil {
742 err = i.Enforcer.RemoveSpindle(instance)
743 if err != nil {
744 return err
745 }
746 }
747
748 err = tx.Commit()
749 if err != nil {
750 return err
751 }
752
753 err = i.Enforcer.E.SavePolicy()
754 if err != nil {
755 return err
756 }
757 }
758
759 return nil
760}
761
762func (i *Ingester) ingestString(e *jmodels.Event) error {
763 did := e.Did
764 rkey := e.Commit.RKey
765
766 var err error
767
768 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
769 l.Info("ingesting record")
770
771 ddb, ok := i.Db.Execer.(*db.DB)
772 if !ok {
773 return fmt.Errorf("failed to index string record, invalid db cast")
774 }
775
776 switch e.Commit.Operation {
777 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
778 raw := json.RawMessage(e.Commit.Record)
779 record := tangled.String{}
780 err = json.Unmarshal(raw, &record)
781 if err != nil {
782 l.Error("invalid record", "err", err)
783 return err
784 }
785
786 string := models.StringFromRecord(did, rkey, record)
787
788 if err = i.Validator.ValidateString(&string); err != nil {
789 l.Error("invalid record", "err", err)
790 return err
791 }
792
793 if err = db.AddString(ddb, string); err != nil {
794 l.Error("failed to add string", "err", err)
795 return err
796 }
797
798 return nil
799
800 case jmodels.CommitOperationDelete:
801 if err := db.DeleteString(
802 ddb,
803 orm.FilterEq("did", did),
804 orm.FilterEq("rkey", rkey),
805 ); err != nil {
806 l.Error("failed to delete", "err", err)
807 return fmt.Errorf("failed to delete string record: %w", err)
808 }
809
810 return nil
811 }
812
813 return nil
814}
815
816func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
817 did := e.Did
818 var err error
819
820 l := i.Logger.With("handler", "ingestKnotMember")
821 l = l.With("nsid", e.Commit.Collection)
822
823 switch e.Commit.Operation {
824 case jmodels.CommitOperationCreate:
825 raw := json.RawMessage(e.Commit.Record)
826 record := tangled.KnotMember{}
827 err = json.Unmarshal(raw, &record)
828 if err != nil {
829 l.Error("invalid record", "err", err)
830 return err
831 }
832
833 // only knot owner can invite to knots
834 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
835 if err != nil || !ok {
836 return fmt.Errorf("failed to enforce permissions: %w", err)
837 }
838
839 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
840 if err != nil {
841 return err
842 }
843
844 if memberId.Handle.IsInvalidHandle() {
845 return err
846 }
847
848 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
849 if err != nil {
850 return fmt.Errorf("failed to update ACLs: %w", err)
851 }
852
853 l.Info("added knot member")
854 case jmodels.CommitOperationDelete:
855 // we don't store knot members in a table (like we do for spindle)
856 // and we can't remove this just yet. possibly fixed if we switch
857 // to either:
858 // 1. a knot_members table like with spindle and store the rkey
859 // 2. use the knot host as the rkey
860 //
861 // TODO: implement member deletion
862 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
863 }
864
865 return nil
866}
867
868func (i *Ingester) ingestKnot(e *jmodels.Event) error {
869 did := e.Did
870 var err error
871
872 l := i.Logger.With("handler", "ingestKnot")
873 l = l.With("nsid", e.Commit.Collection)
874
875 switch e.Commit.Operation {
876 case jmodels.CommitOperationCreate:
877 raw := json.RawMessage(e.Commit.Record)
878 record := tangled.Knot{}
879 err = json.Unmarshal(raw, &record)
880 if err != nil {
881 l.Error("invalid record", "err", err)
882 return err
883 }
884
885 domain := e.Commit.RKey
886
887 ddb, ok := i.Db.Execer.(*db.DB)
888 if !ok {
889 return fmt.Errorf("failed to index profile record, invalid db cast")
890 }
891
892 err := db.AddKnot(ddb, domain, did)
893 if err != nil {
894 l.Error("failed to add knot to db", "err", err, "domain", domain)
895 return err
896 }
897
898 err = retry.Do(
899 func() error {
900 return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
901 },
902 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second),
903 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true),
904 )
905 if err != nil {
906 l.Error("failed to verify knot after retries", "err", err, "domain", domain)
907 return err
908 }
909
910 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
911 if err != nil {
912 return fmt.Errorf("failed to mark verified: %w", err)
913 }
914
915 return nil
916
917 case jmodels.CommitOperationDelete:
918 domain := e.Commit.RKey
919
920 ddb, ok := i.Db.Execer.(*db.DB)
921 if !ok {
922 return fmt.Errorf("failed to index knot record, invalid db cast")
923 }
924
925 // get record from db first
926 registrations, err := db.GetRegistrations(
927 ddb,
928 orm.FilterEq("domain", domain),
929 orm.FilterEq("did", did),
930 )
931 if err != nil {
932 return fmt.Errorf("failed to get registration: %w", err)
933 }
934 if len(registrations) != 1 {
935 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations))
936 }
937 registration := registrations[0]
938
939 tx, err := ddb.Begin()
940 if err != nil {
941 return err
942 }
943 defer func() {
944 tx.Rollback()
945 i.Enforcer.E.LoadPolicy()
946 }()
947
948 err = db.DeleteKnot(
949 tx,
950 orm.FilterEq("did", did),
951 orm.FilterEq("domain", domain),
952 )
953 if err != nil {
954 return err
955 }
956
957 if registration.Registered != nil {
958 err = i.Enforcer.RemoveKnot(domain)
959 if err != nil {
960 return err
961 }
962 }
963
964 err = tx.Commit()
965 if err != nil {
966 return err
967 }
968
969 err = i.Enforcer.E.SavePolicy()
970 if err != nil {
971 return err
972 }
973 }
974
975 return nil
976}
977func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
978 did := e.Did
979 rkey := e.Commit.RKey
980
981 var err error
982
983 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
984 l.Info("ingesting record")
985
986 ddb, ok := i.Db.Execer.(*db.DB)
987 if !ok {
988 return fmt.Errorf("failed to index issue record, invalid db cast")
989 }
990
991 switch e.Commit.Operation {
992 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
993 raw := json.RawMessage(e.Commit.Record)
994 record := tangled.RepoIssue{}
995 err = json.Unmarshal(raw, &record)
996 if err != nil {
997 l.Error("invalid record", "err", err)
998 return err
999 }
1000
1001 issue := models.IssueFromRecord(did, rkey, record)
1002
1003 if issue.RepoAt == "" {
1004 return fmt.Errorf("issue record has no repo field")
1005 }
1006
1007 if err := i.Validator.ValidateIssue(&issue); err != nil {
1008 return fmt.Errorf("failed to validate issue: %w", err)
1009 }
1010
1011 if record.Repo != nil {
1012 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo)
1013 if repoErr == nil && repo.RepoDid != "" {
1014 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil {
1015 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
1016 }
1017 }
1018 }
1019
1020 tx, err := ddb.BeginTx(ctx, nil)
1021 if err != nil {
1022 l.Error("failed to begin transaction", "err", err)
1023 return err
1024 }
1025 defer tx.Rollback()
1026
1027 err = db.PutIssue(tx, &issue)
1028 if err != nil {
1029 l.Error("failed to create issue", "err", err)
1030 return err
1031 }
1032
1033 err = tx.Commit()
1034 if err != nil {
1035 l.Error("failed to commit txn", "err", err)
1036 return err
1037 }
1038
1039 return nil
1040
1041 case jmodels.CommitOperationDelete:
1042 tx, err := ddb.BeginTx(ctx, nil)
1043 if err != nil {
1044 l.Error("failed to begin transaction", "err", err)
1045 return err
1046 }
1047 defer tx.Rollback()
1048
1049 if err := db.DeleteIssues(
1050 tx,
1051 did,
1052 rkey,
1053 ); err != nil {
1054 l.Error("failed to delete", "err", err)
1055 return fmt.Errorf("failed to delete issue record: %w", err)
1056 }
1057 if err := tx.Commit(); err != nil {
1058 l.Error("failed to commit txn", "err", err)
1059 return err
1060 }
1061
1062 return nil
1063 }
1064
1065 return nil
1066}
1067
1068func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error {
1069 did := e.Did
1070 rkey := e.Commit.RKey
1071
1072 var err error
1073
1074 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1075 l.Info("ingesting record")
1076
1077 ddb, ok := i.Db.Execer.(*db.DB)
1078 if !ok {
1079 return fmt.Errorf("failed to index pull record, invalid db cast")
1080 }
1081
1082 switch e.Commit.Operation {
1083 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1084 raw := json.RawMessage(e.Commit.Record)
1085 record := tangled.RepoPull{}
1086 err = json.Unmarshal(raw, &record)
1087 if err != nil {
1088 l.Error("invalid record", "err", err)
1089 return err
1090 }
1091
1092 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
1093 if err != nil {
1094 l.Error("failed to resolve did")
1095 return err
1096 }
1097
1098 // go through and fetch all blobs in parallel
1099 readers := make([]*io.ReadCloser, len(record.Rounds))
1100 var mu sync.Mutex
1101
1102 g, gctx := errgroup.WithContext(ctx)
1103
1104 for idx, b := range record.Rounds {
1105 g.Go(func() error {
1106 // for some reason, a blob is empty
1107 if b.PatchBlob == nil {
1108 return fmt.Errorf("missing patchBlob in round %d", idx)
1109 }
1110
1111 ownerPds := ownerId.PDSEndpoint()
1112 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
1113 q := url.Query()
1114 q.Set("cid", b.PatchBlob.Ref.String())
1115 q.Set("did", did)
1116 url.RawQuery = q.Encode()
1117
1118 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
1119 if err != nil {
1120 l.Error("failed to create request")
1121 return err
1122 }
1123 req.Header.Set("Content-Type", "application/json")
1124
1125 resp, err := http.DefaultClient.Do(req)
1126 if err != nil {
1127 l.Error("failed to make request")
1128 return err
1129 }
1130
1131 mu.Lock()
1132 readers[idx] = &resp.Body
1133 mu.Unlock()
1134
1135 return nil
1136 })
1137 }
1138
1139 if err := g.Wait(); err != nil {
1140 for _, r := range readers {
1141 if r != nil && *r != nil {
1142 (*r).Close()
1143 }
1144 }
1145 return err
1146 }
1147
1148 defer func() {
1149 for _, r := range readers {
1150 if r != nil && *r != nil {
1151 (*r).Close()
1152 }
1153 }
1154 }()
1155
1156 pull, err := models.PullFromRecord(did, rkey, record, readers)
1157 if err != nil {
1158 return fmt.Errorf("failed to parse pull from record: %w", err)
1159 }
1160 if err := i.Validator.ValidatePull(pull); err != nil {
1161 return fmt.Errorf("failed to validate pull: %w", err)
1162 }
1163
1164 tx, err := ddb.BeginTx(ctx, nil)
1165 if err != nil {
1166 l.Error("failed to begin transaction", "err", err)
1167 return err
1168 }
1169 defer tx.Rollback()
1170
1171 err = db.PutPull(tx, pull)
1172 if err != nil {
1173 l.Error("failed to create pull", "err", err)
1174 return err
1175 }
1176
1177 err = tx.Commit()
1178 if err != nil {
1179 l.Error("failed to commit txn", "err", err)
1180 return err
1181 }
1182
1183 return nil
1184
1185 case jmodels.CommitOperationDelete:
1186 tx, err := ddb.BeginTx(ctx, nil)
1187 if err != nil {
1188 l.Error("failed to begin transaction", "err", err)
1189 return err
1190 }
1191 defer tx.Rollback()
1192
1193 if err := db.AbandonPulls(
1194 tx,
1195 orm.FilterEq("owner_did", did),
1196 orm.FilterEq("rkey", rkey),
1197 ); err != nil {
1198 l.Error("failed to abandon", "err", err)
1199 return fmt.Errorf("failed to abandon pull record: %w", err)
1200 }
1201 if err := tx.Commit(); err != nil {
1202 l.Error("failed to commit txn", "err", err)
1203 return err
1204 }
1205
1206 return nil
1207 }
1208
1209 return nil
1210}
1211
1212func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
1213 did := e.Did
1214 rkey := e.Commit.RKey
1215
1216 var err error
1217
1218 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1219 l.Info("ingesting record")
1220
1221 ddb, ok := i.Db.Execer.(*db.DB)
1222 if !ok {
1223 return fmt.Errorf("failed to index issue comment record, invalid db cast")
1224 }
1225
1226 switch e.Commit.Operation {
1227 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1228 raw := json.RawMessage(e.Commit.Record)
1229 record := tangled.RepoIssueComment{}
1230 err = json.Unmarshal(raw, &record)
1231 if err != nil {
1232 return fmt.Errorf("invalid record: %w", err)
1233 }
1234
1235 comment, err := models.IssueCommentFromRecord(did, rkey, record)
1236 if err != nil {
1237 return fmt.Errorf("failed to parse comment from record: %w", err)
1238 }
1239
1240 if err := i.Validator.ValidateIssueComment(comment); err != nil {
1241 return fmt.Errorf("failed to validate comment: %w", err)
1242 }
1243
1244 tx, err := ddb.Begin()
1245 if err != nil {
1246 return fmt.Errorf("failed to start transaction: %w", err)
1247 }
1248 defer tx.Rollback()
1249
1250 _, err = db.AddIssueComment(tx, *comment)
1251 if err != nil {
1252 return fmt.Errorf("failed to create issue comment: %w", err)
1253 }
1254
1255 return tx.Commit()
1256
1257 case jmodels.CommitOperationDelete:
1258 if err := db.DeleteIssueComments(
1259 ddb,
1260 orm.FilterEq("did", did),
1261 orm.FilterEq("rkey", rkey),
1262 ); err != nil {
1263 return fmt.Errorf("failed to delete issue comment record: %w", err)
1264 }
1265
1266 return nil
1267 }
1268
1269 return nil
1270}
1271
1272func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
1273 did := e.Did
1274 rkey := e.Commit.RKey
1275
1276 var err error
1277
1278 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1279 l.Info("ingesting record")
1280
1281 ddb, ok := i.Db.Execer.(*db.DB)
1282 if !ok {
1283 return fmt.Errorf("failed to index label definition, invalid db cast")
1284 }
1285
1286 switch e.Commit.Operation {
1287 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1288 raw := json.RawMessage(e.Commit.Record)
1289 record := tangled.LabelDefinition{}
1290 err = json.Unmarshal(raw, &record)
1291 if err != nil {
1292 return fmt.Errorf("invalid record: %w", err)
1293 }
1294
1295 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1296 if err != nil {
1297 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1298 }
1299
1300 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1301 return fmt.Errorf("failed to validate labeldef: %w", err)
1302 }
1303
1304 _, err = db.AddLabelDefinition(ddb, def)
1305 if err != nil {
1306 return fmt.Errorf("failed to create labeldef: %w", err)
1307 }
1308
1309 return nil
1310
1311 case jmodels.CommitOperationDelete:
1312 if err := db.DeleteLabelDefinition(
1313 ddb,
1314 orm.FilterEq("did", did),
1315 orm.FilterEq("rkey", rkey),
1316 ); err != nil {
1317 return fmt.Errorf("failed to delete labeldef record: %w", err)
1318 }
1319
1320 return nil
1321 }
1322
1323 return nil
1324}
1325
1326func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1327 did := e.Did
1328 rkey := e.Commit.RKey
1329
1330 var err error
1331
1332 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1333 l.Info("ingesting record")
1334
1335 ddb, ok := i.Db.Execer.(*db.DB)
1336 if !ok {
1337 return fmt.Errorf("failed to index label op, invalid db cast")
1338 }
1339
1340 switch e.Commit.Operation {
1341 case jmodels.CommitOperationCreate:
1342 raw := json.RawMessage(e.Commit.Record)
1343 record := tangled.LabelOp{}
1344 err = json.Unmarshal(raw, &record)
1345 if err != nil {
1346 return fmt.Errorf("invalid record: %w", err)
1347 }
1348
1349 subject := syntax.ATURI(record.Subject)
1350 collection := subject.Collection()
1351
1352 var repo *models.Repo
1353 switch collection {
1354 case tangled.RepoIssueNSID:
1355 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1356 if err != nil || len(i) != 1 {
1357 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1358 }
1359 repo = i[0].Repo
1360 default:
1361 return fmt.Errorf("unsupported label subject: %s", collection)
1362 }
1363
1364 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1365 if err != nil {
1366 return fmt.Errorf("failed to build label application ctx: %w", err)
1367 }
1368
1369 ops := models.LabelOpsFromRecord(did, rkey, record)
1370
1371 for _, o := range ops {
1372 def, ok := actx.Defs[o.OperandKey]
1373 if !ok {
1374 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1375 }
1376 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1377 return fmt.Errorf("failed to validate labelop: %w", err)
1378 }
1379 }
1380
1381 tx, err := ddb.Begin()
1382 if err != nil {
1383 return err
1384 }
1385 defer tx.Rollback()
1386
1387 for _, o := range ops {
1388 _, err = db.AddLabelOp(tx, &o)
1389 if err != nil {
1390 return fmt.Errorf("failed to add labelop: %w", err)
1391 }
1392 }
1393
1394 if err = tx.Commit(); err != nil {
1395 return err
1396 }
1397 }
1398
1399 return nil
1400}