forked from
tangled.org/core
Monorepo for Tangled
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "net/http"
13 "net/url"
14 "slices"
15 "sync"
16
17 "time"
18
19 "github.com/avast/retry-go/v4"
20 "github.com/bluesky-social/indigo/atproto/syntax"
21 jmodels "github.com/bluesky-social/jetstream/pkg/models"
22 "github.com/go-git/go-git/v5/plumbing"
23 "github.com/ipfs/go-cid"
24 "golang.org/x/sync/errgroup"
25 "tangled.org/core/api/tangled"
26 "tangled.org/core/appview/cache"
27 "tangled.org/core/appview/config"
28 "tangled.org/core/appview/db"
29 "tangled.org/core/appview/models"
30 "tangled.org/core/appview/serververify"
31 "tangled.org/core/appview/validator"
32 "tangled.org/core/idresolver"
33 "tangled.org/core/orm"
34 "tangled.org/core/rbac"
35)
36
37type Ingester struct {
38 Db db.DbWrapper
39 Enforcer *rbac.Enforcer
40 IdResolver *idresolver.Resolver
41 Cache *cache.Cache
42 Config *config.Config
43 Logger *slog.Logger
44 Validator *validator.Validator
45}
46
47type processFunc func(ctx context.Context, e *jmodels.Event) error
48
49func (i *Ingester) Ingest() processFunc {
50 return func(ctx context.Context, e *jmodels.Event) error {
51 var err error
52
53 l := i.Logger.With("kind", e.Kind)
54 switch e.Kind {
55 case jmodels.EventKindAccount:
56 // TODO: sync account state to db
57 if e.Account.Active {
58 break
59 }
60 // TODO: revoke sessions by DID
61 if *e.Account.Status == "deactivated" {
62 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
63 }
64 case jmodels.EventKindIdentity:
65 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
66 case jmodels.EventKindCommit:
67 switch e.Commit.Collection {
68 case tangled.GraphFollowNSID:
69 err = i.ingestFollow(e)
70 case tangled.FeedStarNSID:
71 err = i.ingestStar(e)
72 case tangled.PublicKeyNSID:
73 err = i.ingestPublicKey(e)
74 case tangled.RepoArtifactNSID:
75 err = i.ingestArtifact(e)
76 case tangled.ActorProfileNSID:
77 err = i.ingestProfile(ctx, e)
78 case tangled.SpindleMemberNSID:
79 err = i.ingestSpindleMember(ctx, e)
80 case tangled.SpindleNSID:
81 err = i.ingestSpindle(ctx, e)
82 case tangled.KnotMemberNSID:
83 err = i.ingestKnotMember(e)
84 case tangled.KnotNSID:
85 err = i.ingestKnot(e)
86 case tangled.StringNSID:
87 err = i.ingestString(e)
88 case tangled.RepoIssueNSID:
89 err = i.ingestIssue(ctx, e)
90 case tangled.RepoPullNSID:
91 err = i.ingestPull(ctx, e)
92 case tangled.RepoIssueCommentNSID:
93 err = i.ingestIssueComment(e)
94 case tangled.LabelDefinitionNSID:
95 err = i.ingestLabelDefinition(e)
96 case tangled.LabelOpNSID:
97 err = i.ingestLabelOp(e)
98 }
99 l = i.Logger.With("nsid", e.Commit.Collection)
100 }
101
102 if err != nil {
103 l.Warn("failed to ingest record, skipping", "err", err)
104 }
105
106 lastTimeUs := e.TimeUS + 1
107 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
108 l.Error("failed to save cursor", "err", saveErr)
109 }
110
111 return nil
112 }
113}
114
115func (i *Ingester) ingestStar(e *jmodels.Event) error {
116 var err error
117 did := e.Did
118
119 l := i.Logger.With("handler", "ingestStar")
120 l = l.With("nsid", e.Commit.Collection)
121
122 switch e.Commit.Operation {
123 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
124 var subjectUri syntax.ATURI
125
126 raw := json.RawMessage(e.Commit.Record)
127 record := tangled.FeedStar{}
128 err := json.Unmarshal(raw, &record)
129 if err != nil {
130 l.Error("invalid record", "err", err)
131 return err
132 }
133
134 star := &models.Star{
135 Did: did,
136 Rkey: e.Commit.RKey,
137 }
138
139 switch {
140 case record.SubjectDid != nil:
141 repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid))
142 if repoErr == nil {
143 subjectUri = repo.RepoAt()
144 star.RepoAt = subjectUri
145 }
146 case record.Subject != nil:
147 subjectUri, err = syntax.ParseATURI(*record.Subject)
148 if err != nil {
149 l.Error("invalid record", "err", err)
150 return err
151 }
152 star.RepoAt = subjectUri
153 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String())
154 if repoErr == nil && repo.RepoDid != "" {
155 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil {
156 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
157 }
158 }
159 default:
160 l.Error("star record has neither subject nor subjectDid")
161 return fmt.Errorf("star record has neither subject nor subjectDid")
162 }
163 err = db.AddStar(i.Db, star)
164 case jmodels.CommitOperationDelete:
165 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
166 }
167
168 if err != nil {
169 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
170 }
171
172 return nil
173}
174
175func (i *Ingester) ingestFollow(e *jmodels.Event) error {
176 var err error
177 did := e.Did
178
179 l := i.Logger.With("handler", "ingestFollow")
180 l = l.With("nsid", e.Commit.Collection)
181
182 switch e.Commit.Operation {
183 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
184 raw := json.RawMessage(e.Commit.Record)
185 record := tangled.GraphFollow{}
186 err = json.Unmarshal(raw, &record)
187 if err != nil {
188 l.Error("invalid record", "err", err)
189 return err
190 }
191
192 err = db.AddFollow(i.Db, &models.Follow{
193 UserDid: did,
194 SubjectDid: record.Subject,
195 Rkey: e.Commit.RKey,
196 })
197 case jmodels.CommitOperationDelete:
198 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
199 }
200
201 if err != nil {
202 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
203 }
204
205 return nil
206}
207
208func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
209 did := e.Did
210 var err error
211
212 l := i.Logger.With("handler", "ingestPublicKey")
213 l = l.With("nsid", e.Commit.Collection)
214
215 switch e.Commit.Operation {
216 case jmodels.CommitOperationCreate:
217 l.Debug("processing add of pubkey")
218 raw := json.RawMessage(e.Commit.Record)
219 record := tangled.PublicKey{}
220 err = json.Unmarshal(raw, &record)
221 if err != nil {
222 l.Error("invalid record", "err", err)
223 return err
224 }
225
226 name := record.Name
227 key := record.Key
228 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
229 case jmodels.CommitOperationUpdate:
230 l.Debug("processing update of pubkey")
231 raw := json.RawMessage(e.Commit.Record)
232 record := tangled.PublicKey{}
233 err = json.Unmarshal(raw, &record)
234 if err != nil {
235 l.Error("invalid record", "err", err)
236 return err
237 }
238
239 name := record.Name
240 key := record.Key
241 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey)
242 case jmodels.CommitOperationDelete:
243 l.Debug("processing delete of pubkey")
244 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
245 }
246
247 if err != nil {
248 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
249 }
250
251 return nil
252}
253
254func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
255 did := e.Did
256 var err error
257
258 l := i.Logger.With("handler", "ingestArtifact")
259 l = l.With("nsid", e.Commit.Collection)
260
261 switch e.Commit.Operation {
262 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
263 raw := json.RawMessage(e.Commit.Record)
264 record := tangled.RepoArtifact{}
265 err = json.Unmarshal(raw, &record)
266 if err != nil {
267 l.Error("invalid record", "err", err)
268 return err
269 }
270
271 var repo *models.Repo
272 if record.RepoDid != nil && *record.RepoDid != "" {
273 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
274 if err != nil && !errors.Is(err, sql.ErrNoRows) {
275 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
276 }
277 }
278 if repo == nil && record.Repo != nil {
279 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
280 if parseErr != nil {
281 return parseErr
282 }
283 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
284 if err != nil {
285 return err
286 }
287 }
288 if repo == nil {
289 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
290 }
291
292 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push")
293 if err != nil || !ok {
294 return err
295 }
296
297 repoDid := repo.RepoDid
298 if repoDid == "" && record.RepoDid != nil {
299 repoDid = *record.RepoDid
300 }
301 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
302 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil {
303 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
304 }
305 }
306
307 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
308 if err != nil {
309 createdAt = time.Now()
310 }
311
312 artifact := models.Artifact{
313 Did: did,
314 Rkey: e.Commit.RKey,
315 RepoAt: repo.RepoAt(),
316 Tag: plumbing.Hash(record.Tag),
317 CreatedAt: createdAt,
318 BlobCid: cid.Cid(record.Artifact.Ref),
319 Name: record.Name,
320 Size: uint64(record.Artifact.Size),
321 MimeType: record.Artifact.MimeType,
322 }
323
324 err = db.AddArtifact(i.Db, artifact)
325 case jmodels.CommitOperationDelete:
326 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
327 }
328
329 if err != nil {
330 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
331 }
332
333 return nil
334}
335
336func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error {
337 did := e.Did
338 var err error
339
340 l := i.Logger.With("handler", "ingestProfile")
341 l = l.With("nsid", e.Commit.Collection)
342
343 if e.Commit.RKey != "self" {
344 return fmt.Errorf("ingestProfile only ingests `self` record")
345 }
346
347 switch e.Commit.Operation {
348 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
349 raw := json.RawMessage(e.Commit.Record)
350 record := tangled.ActorProfile{}
351 err = json.Unmarshal(raw, &record)
352 if err != nil {
353 l.Error("invalid record", "err", err)
354 return err
355 }
356
357 avatar := ""
358 if record.Avatar != nil {
359 avatar = record.Avatar.Ref.String()
360 }
361
362 description := ""
363 if record.Description != nil {
364 description = *record.Description
365 }
366
367 includeBluesky := record.Bluesky
368
369 pronouns := ""
370 if record.Pronouns != nil {
371 pronouns = *record.Pronouns
372 }
373
374 location := ""
375 if record.Location != nil {
376 location = *record.Location
377 }
378
379 var links [5]string
380 for i, l := range record.Links {
381 if i < 5 {
382 links[i] = l
383 }
384 }
385
386 var stats [2]models.VanityStat
387 for i, s := range record.Stats {
388 if i < 2 {
389 stats[i].Kind = models.ParseVanityStatKind(s)
390 }
391 }
392
393 var pinned [6]string
394 for i, r := range record.PinnedRepositories {
395 if i < 6 {
396 pinned[i] = r
397 }
398 }
399
400 var preferredHandle syntax.Handle
401 if record.PreferredHandle != nil {
402 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
403 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
404 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
405 preferredHandle = h
406 }
407 }
408 }
409
410 profile := models.Profile{
411 Did: did,
412 Avatar: avatar,
413 Description: description,
414 IncludeBluesky: includeBluesky,
415 Location: location,
416 Links: links,
417 Stats: stats,
418 PinnedRepos: pinned,
419 Pronouns: pronouns,
420 PreferredHandle: preferredHandle,
421 }
422
423 ddb, ok := i.Db.Execer.(*db.DB)
424 if !ok {
425 return fmt.Errorf("failed to index profile record, invalid db cast")
426 }
427
428 tx, err := ddb.Begin()
429 if err != nil {
430 return fmt.Errorf("failed to start transaction")
431 }
432
433 err = db.ValidateProfile(tx, &profile)
434 if err != nil {
435 return fmt.Errorf("invalid profile record")
436 }
437
438 err = db.UpsertProfile(tx, &profile)
439 if err == nil && i.Cache != nil {
440 pipe := i.Cache.Pipeline()
441 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did)
442 if preferredHandle != "" {
443 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL)
444 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL)
445 } else {
446 pipe.Del(ctx, didKey)
447 }
448 if _, execErr := pipe.Exec(ctx); execErr != nil {
449 l.Warn("failed to update preferred handle cache", "err", execErr)
450 }
451 }
452 case jmodels.CommitOperationDelete:
453 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
454 }
455
456 if err != nil {
457 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
458 }
459
460 return nil
461}
462
463func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
464 did := e.Did
465 var err error
466
467 l := i.Logger.With("handler", "ingestSpindleMember")
468 l = l.With("nsid", e.Commit.Collection)
469
470 switch e.Commit.Operation {
471 case jmodels.CommitOperationCreate:
472 raw := json.RawMessage(e.Commit.Record)
473 record := tangled.SpindleMember{}
474 err = json.Unmarshal(raw, &record)
475 if err != nil {
476 l.Error("invalid record", "err", err)
477 return err
478 }
479
480 // only spindle owner can invite to spindles
481 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
482 if err != nil || !ok {
483 return fmt.Errorf("failed to enforce permissions: %w", err)
484 }
485
486 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
487 if err != nil {
488 return err
489 }
490
491 if memberId.Handle.IsInvalidHandle() {
492 return err
493 }
494
495 ddb, ok := i.Db.Execer.(*db.DB)
496 if !ok {
497 return fmt.Errorf("invalid db cast")
498 }
499
500 err = db.AddSpindleMember(ddb, models.SpindleMember{
501 Did: syntax.DID(did),
502 Rkey: e.Commit.RKey,
503 Instance: record.Instance,
504 Subject: memberId.DID,
505 })
506 if !ok {
507 return fmt.Errorf("failed to add to db: %w", err)
508 }
509
510 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
511 if err != nil {
512 return fmt.Errorf("failed to update ACLs: %w", err)
513 }
514
515 l.Info("added spindle member")
516 case jmodels.CommitOperationDelete:
517 rkey := e.Commit.RKey
518
519 ddb, ok := i.Db.Execer.(*db.DB)
520 if !ok {
521 return fmt.Errorf("failed to index profile record, invalid db cast")
522 }
523
524 // get record from db first
525 members, err := db.GetSpindleMembers(
526 ddb,
527 orm.FilterEq("did", did),
528 orm.FilterEq("rkey", rkey),
529 )
530 if err != nil || len(members) != 1 {
531 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
532 }
533 member := members[0]
534
535 tx, err := ddb.Begin()
536 if err != nil {
537 return fmt.Errorf("failed to start txn: %w", err)
538 }
539
540 // remove record by rkey && update enforcer
541 if err = db.RemoveSpindleMember(
542 tx,
543 orm.FilterEq("did", did),
544 orm.FilterEq("rkey", rkey),
545 ); err != nil {
546 return fmt.Errorf("failed to remove from db: %w", err)
547 }
548
549 // update enforcer
550 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
551 if err != nil {
552 return fmt.Errorf("failed to update ACLs: %w", err)
553 }
554
555 if err = tx.Commit(); err != nil {
556 return fmt.Errorf("failed to commit txn: %w", err)
557 }
558
559 if err = i.Enforcer.E.SavePolicy(); err != nil {
560 return fmt.Errorf("failed to save ACLs: %w", err)
561 }
562
563 l.Info("removed spindle member")
564 }
565
566 return nil
567}
568
569func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
570 did := e.Did
571 var err error
572
573 l := i.Logger.With("handler", "ingestSpindle")
574 l = l.With("nsid", e.Commit.Collection)
575
576 switch e.Commit.Operation {
577 case jmodels.CommitOperationCreate:
578 raw := json.RawMessage(e.Commit.Record)
579 record := tangled.Spindle{}
580 err = json.Unmarshal(raw, &record)
581 if err != nil {
582 l.Error("invalid record", "err", err)
583 return err
584 }
585
586 instance := e.Commit.RKey
587
588 ddb, ok := i.Db.Execer.(*db.DB)
589 if !ok {
590 return fmt.Errorf("failed to index profile record, invalid db cast")
591 }
592
593 err := db.AddSpindle(ddb, models.Spindle{
594 Owner: syntax.DID(did),
595 Instance: instance,
596 })
597 if err != nil {
598 l.Error("failed to add spindle to db", "err", err, "instance", instance)
599 return err
600 }
601
602 err = retry.Do(
603 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
604 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second),
605 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true),
606 )
607 if err != nil {
608 l.Error("failed to verify spindle after retries", "err", err, "instance", instance)
609 return err
610 }
611
612 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
613 if err != nil {
614 return fmt.Errorf("failed to mark verified: %w", err)
615 }
616
617 return nil
618
619 case jmodels.CommitOperationDelete:
620 instance := e.Commit.RKey
621
622 ddb, ok := i.Db.Execer.(*db.DB)
623 if !ok {
624 return fmt.Errorf("failed to index profile record, invalid db cast")
625 }
626
627 // get record from db first
628 spindles, err := db.GetSpindles(
629 ctx,
630 ddb,
631 orm.FilterEq("owner", did),
632 orm.FilterEq("instance", instance),
633 )
634 if err != nil || len(spindles) != 1 {
635 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
636 }
637 spindle := spindles[0]
638
639 tx, err := ddb.Begin()
640 if err != nil {
641 return err
642 }
643 defer func() {
644 tx.Rollback()
645 i.Enforcer.E.LoadPolicy()
646 }()
647
648 // remove spindle members first
649 err = db.RemoveSpindleMember(
650 tx,
651 orm.FilterEq("owner", did),
652 orm.FilterEq("instance", instance),
653 )
654 if err != nil {
655 return err
656 }
657
658 err = db.DeleteSpindle(
659 tx,
660 orm.FilterEq("owner", did),
661 orm.FilterEq("instance", instance),
662 )
663 if err != nil {
664 return err
665 }
666
667 if spindle.Verified != nil {
668 err = i.Enforcer.RemoveSpindle(instance)
669 if err != nil {
670 return err
671 }
672 }
673
674 err = tx.Commit()
675 if err != nil {
676 return err
677 }
678
679 err = i.Enforcer.E.SavePolicy()
680 if err != nil {
681 return err
682 }
683 }
684
685 return nil
686}
687
688func (i *Ingester) ingestString(e *jmodels.Event) error {
689 did := e.Did
690 rkey := e.Commit.RKey
691
692 var err error
693
694 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
695 l.Info("ingesting record")
696
697 ddb, ok := i.Db.Execer.(*db.DB)
698 if !ok {
699 return fmt.Errorf("failed to index string record, invalid db cast")
700 }
701
702 switch e.Commit.Operation {
703 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
704 raw := json.RawMessage(e.Commit.Record)
705 record := tangled.String{}
706 err = json.Unmarshal(raw, &record)
707 if err != nil {
708 l.Error("invalid record", "err", err)
709 return err
710 }
711
712 string := models.StringFromRecord(did, rkey, record)
713
714 if err = i.Validator.ValidateString(&string); err != nil {
715 l.Error("invalid record", "err", err)
716 return err
717 }
718
719 if err = db.AddString(ddb, string); err != nil {
720 l.Error("failed to add string", "err", err)
721 return err
722 }
723
724 return nil
725
726 case jmodels.CommitOperationDelete:
727 if err := db.DeleteString(
728 ddb,
729 orm.FilterEq("did", did),
730 orm.FilterEq("rkey", rkey),
731 ); err != nil {
732 l.Error("failed to delete", "err", err)
733 return fmt.Errorf("failed to delete string record: %w", err)
734 }
735
736 return nil
737 }
738
739 return nil
740}
741
742func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
743 did := e.Did
744 var err error
745
746 l := i.Logger.With("handler", "ingestKnotMember")
747 l = l.With("nsid", e.Commit.Collection)
748
749 switch e.Commit.Operation {
750 case jmodels.CommitOperationCreate:
751 raw := json.RawMessage(e.Commit.Record)
752 record := tangled.KnotMember{}
753 err = json.Unmarshal(raw, &record)
754 if err != nil {
755 l.Error("invalid record", "err", err)
756 return err
757 }
758
759 // only knot owner can invite to knots
760 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
761 if err != nil || !ok {
762 return fmt.Errorf("failed to enforce permissions: %w", err)
763 }
764
765 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
766 if err != nil {
767 return err
768 }
769
770 if memberId.Handle.IsInvalidHandle() {
771 return err
772 }
773
774 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
775 if err != nil {
776 return fmt.Errorf("failed to update ACLs: %w", err)
777 }
778
779 l.Info("added knot member")
780 case jmodels.CommitOperationDelete:
781 // we don't store knot members in a table (like we do for spindle)
782 // and we can't remove this just yet. possibly fixed if we switch
783 // to either:
784 // 1. a knot_members table like with spindle and store the rkey
785 // 2. use the knot host as the rkey
786 //
787 // TODO: implement member deletion
788 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
789 }
790
791 return nil
792}
793
794func (i *Ingester) ingestKnot(e *jmodels.Event) error {
795 did := e.Did
796 var err error
797
798 l := i.Logger.With("handler", "ingestKnot")
799 l = l.With("nsid", e.Commit.Collection)
800
801 switch e.Commit.Operation {
802 case jmodels.CommitOperationCreate:
803 raw := json.RawMessage(e.Commit.Record)
804 record := tangled.Knot{}
805 err = json.Unmarshal(raw, &record)
806 if err != nil {
807 l.Error("invalid record", "err", err)
808 return err
809 }
810
811 domain := e.Commit.RKey
812
813 ddb, ok := i.Db.Execer.(*db.DB)
814 if !ok {
815 return fmt.Errorf("failed to index profile record, invalid db cast")
816 }
817
818 err := db.AddKnot(ddb, domain, did)
819 if err != nil {
820 l.Error("failed to add knot to db", "err", err, "domain", domain)
821 return err
822 }
823
824 err = retry.Do(
825 func() error {
826 return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
827 },
828 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second),
829 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true),
830 )
831 if err != nil {
832 l.Error("failed to verify knot after retries", "err", err, "domain", domain)
833 return err
834 }
835
836 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
837 if err != nil {
838 return fmt.Errorf("failed to mark verified: %w", err)
839 }
840
841 return nil
842
843 case jmodels.CommitOperationDelete:
844 domain := e.Commit.RKey
845
846 ddb, ok := i.Db.Execer.(*db.DB)
847 if !ok {
848 return fmt.Errorf("failed to index knot record, invalid db cast")
849 }
850
851 // get record from db first
852 registrations, err := db.GetRegistrations(
853 ddb,
854 orm.FilterEq("domain", domain),
855 orm.FilterEq("did", did),
856 )
857 if err != nil {
858 return fmt.Errorf("failed to get registration: %w", err)
859 }
860 if len(registrations) != 1 {
861 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations))
862 }
863 registration := registrations[0]
864
865 tx, err := ddb.Begin()
866 if err != nil {
867 return err
868 }
869 defer func() {
870 tx.Rollback()
871 i.Enforcer.E.LoadPolicy()
872 }()
873
874 err = db.DeleteKnot(
875 tx,
876 orm.FilterEq("did", did),
877 orm.FilterEq("domain", domain),
878 )
879 if err != nil {
880 return err
881 }
882
883 if registration.Registered != nil {
884 err = i.Enforcer.RemoveKnot(domain)
885 if err != nil {
886 return err
887 }
888 }
889
890 err = tx.Commit()
891 if err != nil {
892 return err
893 }
894
895 err = i.Enforcer.E.SavePolicy()
896 if err != nil {
897 return err
898 }
899 }
900
901 return nil
902}
903func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
904 did := e.Did
905 rkey := e.Commit.RKey
906
907 var err error
908
909 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
910 l.Info("ingesting record")
911
912 ddb, ok := i.Db.Execer.(*db.DB)
913 if !ok {
914 return fmt.Errorf("failed to index issue record, invalid db cast")
915 }
916
917 switch e.Commit.Operation {
918 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
919 raw := json.RawMessage(e.Commit.Record)
920 record := tangled.RepoIssue{}
921 err = json.Unmarshal(raw, &record)
922 if err != nil {
923 l.Error("invalid record", "err", err)
924 return err
925 }
926
927 issue := models.IssueFromRecord(did, rkey, record)
928
929 if issue.RepoAt == "" {
930 return fmt.Errorf("issue record has no repo field")
931 }
932
933 if err := i.Validator.ValidateIssue(&issue); err != nil {
934 return fmt.Errorf("failed to validate issue: %w", err)
935 }
936
937 if record.Repo != nil {
938 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo)
939 if repoErr == nil && repo.RepoDid != "" {
940 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil {
941 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
942 }
943 }
944 }
945
946 tx, err := ddb.BeginTx(ctx, nil)
947 if err != nil {
948 l.Error("failed to begin transaction", "err", err)
949 return err
950 }
951 defer tx.Rollback()
952
953 err = db.PutIssue(tx, &issue)
954 if err != nil {
955 l.Error("failed to create issue", "err", err)
956 return err
957 }
958
959 err = tx.Commit()
960 if err != nil {
961 l.Error("failed to commit txn", "err", err)
962 return err
963 }
964
965 return nil
966
967 case jmodels.CommitOperationDelete:
968 tx, err := ddb.BeginTx(ctx, nil)
969 if err != nil {
970 l.Error("failed to begin transaction", "err", err)
971 return err
972 }
973 defer tx.Rollback()
974
975 if err := db.DeleteIssues(
976 tx,
977 did,
978 rkey,
979 ); err != nil {
980 l.Error("failed to delete", "err", err)
981 return fmt.Errorf("failed to delete issue record: %w", err)
982 }
983 if err := tx.Commit(); err != nil {
984 l.Error("failed to commit txn", "err", err)
985 return err
986 }
987
988 return nil
989 }
990
991 return nil
992}
993
994func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error {
995 did := e.Did
996 rkey := e.Commit.RKey
997
998 var err error
999
1000 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1001 l.Info("ingesting record")
1002
1003 ddb, ok := i.Db.Execer.(*db.DB)
1004 if !ok {
1005 return fmt.Errorf("failed to index pull record, invalid db cast")
1006 }
1007
1008 switch e.Commit.Operation {
1009 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1010 raw := json.RawMessage(e.Commit.Record)
1011 record := tangled.RepoPull{}
1012 err = json.Unmarshal(raw, &record)
1013 if err != nil {
1014 l.Error("invalid record", "err", err)
1015 return err
1016 }
1017
1018 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
1019 if err != nil {
1020 l.Error("failed to resolve did")
1021 return err
1022 }
1023
1024 // go through and fetch all blobs in parallel
1025 readers := make([]*io.ReadCloser, len(record.Rounds))
1026 var mu sync.Mutex
1027
1028 g, gctx := errgroup.WithContext(ctx)
1029
1030 for idx, b := range record.Rounds {
1031 g.Go(func() error {
1032 // for some reason, a blob is empty
1033 if b.PatchBlob == nil {
1034 return fmt.Errorf("missing patchBlob in round %d", idx)
1035 }
1036
1037 ownerPds := ownerId.PDSEndpoint()
1038 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
1039 q := url.Query()
1040 q.Set("cid", b.PatchBlob.Ref.String())
1041 q.Set("did", did)
1042 url.RawQuery = q.Encode()
1043
1044 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
1045 if err != nil {
1046 l.Error("failed to create request")
1047 return err
1048 }
1049 req.Header.Set("Content-Type", "application/json")
1050
1051 resp, err := http.DefaultClient.Do(req)
1052 if err != nil {
1053 l.Error("failed to make request")
1054 return err
1055 }
1056
1057 mu.Lock()
1058 readers[idx] = &resp.Body
1059 mu.Unlock()
1060
1061 return nil
1062 })
1063 }
1064
1065 if err := g.Wait(); err != nil {
1066 for _, r := range readers {
1067 if r != nil && *r != nil {
1068 (*r).Close()
1069 }
1070 }
1071 return err
1072 }
1073
1074 defer func() {
1075 for _, r := range readers {
1076 if r != nil && *r != nil {
1077 (*r).Close()
1078 }
1079 }
1080 }()
1081
1082 pull, err := models.PullFromRecord(did, rkey, record, readers)
1083 if err != nil {
1084 return fmt.Errorf("failed to parse pull from record: %w", err)
1085 }
1086 if err := i.Validator.ValidatePull(pull); err != nil {
1087 return fmt.Errorf("failed to validate pull: %w", err)
1088 }
1089
1090 tx, err := ddb.BeginTx(ctx, nil)
1091 if err != nil {
1092 l.Error("failed to begin transaction", "err", err)
1093 return err
1094 }
1095 defer tx.Rollback()
1096
1097 err = db.PutPull(tx, pull)
1098 if err != nil {
1099 l.Error("failed to create pull", "err", err)
1100 return err
1101 }
1102
1103 err = tx.Commit()
1104 if err != nil {
1105 l.Error("failed to commit txn", "err", err)
1106 return err
1107 }
1108
1109 return nil
1110
1111 case jmodels.CommitOperationDelete:
1112 tx, err := ddb.BeginTx(ctx, nil)
1113 if err != nil {
1114 l.Error("failed to begin transaction", "err", err)
1115 return err
1116 }
1117 defer tx.Rollback()
1118
1119 if err := db.AbandonPulls(
1120 tx,
1121 orm.FilterEq("owner_did", did),
1122 orm.FilterEq("rkey", rkey),
1123 ); err != nil {
1124 l.Error("failed to abandon", "err", err)
1125 return fmt.Errorf("failed to abandon pull record: %w", err)
1126 }
1127 if err := tx.Commit(); err != nil {
1128 l.Error("failed to commit txn", "err", err)
1129 return err
1130 }
1131
1132 return nil
1133 }
1134
1135 return nil
1136}
1137
1138func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
1139 did := e.Did
1140 rkey := e.Commit.RKey
1141
1142 var err error
1143
1144 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1145 l.Info("ingesting record")
1146
1147 ddb, ok := i.Db.Execer.(*db.DB)
1148 if !ok {
1149 return fmt.Errorf("failed to index issue comment record, invalid db cast")
1150 }
1151
1152 switch e.Commit.Operation {
1153 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1154 raw := json.RawMessage(e.Commit.Record)
1155 record := tangled.RepoIssueComment{}
1156 err = json.Unmarshal(raw, &record)
1157 if err != nil {
1158 return fmt.Errorf("invalid record: %w", err)
1159 }
1160
1161 comment, err := models.IssueCommentFromRecord(did, rkey, record)
1162 if err != nil {
1163 return fmt.Errorf("failed to parse comment from record: %w", err)
1164 }
1165
1166 if err := i.Validator.ValidateIssueComment(comment); err != nil {
1167 return fmt.Errorf("failed to validate comment: %w", err)
1168 }
1169
1170 tx, err := ddb.Begin()
1171 if err != nil {
1172 return fmt.Errorf("failed to start transaction: %w", err)
1173 }
1174 defer tx.Rollback()
1175
1176 _, err = db.AddIssueComment(tx, *comment)
1177 if err != nil {
1178 return fmt.Errorf("failed to create issue comment: %w", err)
1179 }
1180
1181 return tx.Commit()
1182
1183 case jmodels.CommitOperationDelete:
1184 if err := db.DeleteIssueComments(
1185 ddb,
1186 orm.FilterEq("did", did),
1187 orm.FilterEq("rkey", rkey),
1188 ); err != nil {
1189 return fmt.Errorf("failed to delete issue comment record: %w", err)
1190 }
1191
1192 return nil
1193 }
1194
1195 return nil
1196}
1197
1198func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
1199 did := e.Did
1200 rkey := e.Commit.RKey
1201
1202 var err error
1203
1204 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1205 l.Info("ingesting record")
1206
1207 ddb, ok := i.Db.Execer.(*db.DB)
1208 if !ok {
1209 return fmt.Errorf("failed to index label definition, invalid db cast")
1210 }
1211
1212 switch e.Commit.Operation {
1213 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1214 raw := json.RawMessage(e.Commit.Record)
1215 record := tangled.LabelDefinition{}
1216 err = json.Unmarshal(raw, &record)
1217 if err != nil {
1218 return fmt.Errorf("invalid record: %w", err)
1219 }
1220
1221 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1222 if err != nil {
1223 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1224 }
1225
1226 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1227 return fmt.Errorf("failed to validate labeldef: %w", err)
1228 }
1229
1230 _, err = db.AddLabelDefinition(ddb, def)
1231 if err != nil {
1232 return fmt.Errorf("failed to create labeldef: %w", err)
1233 }
1234
1235 return nil
1236
1237 case jmodels.CommitOperationDelete:
1238 if err := db.DeleteLabelDefinition(
1239 ddb,
1240 orm.FilterEq("did", did),
1241 orm.FilterEq("rkey", rkey),
1242 ); err != nil {
1243 return fmt.Errorf("failed to delete labeldef record: %w", err)
1244 }
1245
1246 return nil
1247 }
1248
1249 return nil
1250}
1251
1252func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1253 did := e.Did
1254 rkey := e.Commit.RKey
1255
1256 var err error
1257
1258 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1259 l.Info("ingesting record")
1260
1261 ddb, ok := i.Db.Execer.(*db.DB)
1262 if !ok {
1263 return fmt.Errorf("failed to index label op, invalid db cast")
1264 }
1265
1266 switch e.Commit.Operation {
1267 case jmodels.CommitOperationCreate:
1268 raw := json.RawMessage(e.Commit.Record)
1269 record := tangled.LabelOp{}
1270 err = json.Unmarshal(raw, &record)
1271 if err != nil {
1272 return fmt.Errorf("invalid record: %w", err)
1273 }
1274
1275 subject := syntax.ATURI(record.Subject)
1276 collection := subject.Collection()
1277
1278 var repo *models.Repo
1279 switch collection {
1280 case tangled.RepoIssueNSID:
1281 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1282 if err != nil || len(i) != 1 {
1283 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1284 }
1285 repo = i[0].Repo
1286 default:
1287 return fmt.Errorf("unsupported label subject: %s", collection)
1288 }
1289
1290 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1291 if err != nil {
1292 return fmt.Errorf("failed to build label application ctx: %w", err)
1293 }
1294
1295 ops := models.LabelOpsFromRecord(did, rkey, record)
1296
1297 for _, o := range ops {
1298 def, ok := actx.Defs[o.OperandKey]
1299 if !ok {
1300 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1301 }
1302 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1303 return fmt.Errorf("failed to validate labelop: %w", err)
1304 }
1305 }
1306
1307 tx, err := ddb.Begin()
1308 if err != nil {
1309 return err
1310 }
1311 defer tx.Rollback()
1312
1313 for _, o := range ops {
1314 _, err = db.AddLabelOp(tx, &o)
1315 if err != nil {
1316 return fmt.Errorf("failed to add labelop: %w", err)
1317 }
1318 }
1319
1320 if err = tx.Commit(); err != nil {
1321 return err
1322 }
1323 }
1324
1325 return nil
1326}