forked from
tangled.org/core
Monorepo for Tangled
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "log/slog"
10 "maps"
11 "slices"
12
13 "time"
14
15 "github.com/avast/retry-go/v4"
16 "github.com/bluesky-social/indigo/atproto/syntax"
17 jmodels "github.com/bluesky-social/jetstream/pkg/models"
18 "github.com/go-git/go-git/v5/plumbing"
19 "github.com/ipfs/go-cid"
20 "tangled.org/core/api/tangled"
21 "tangled.org/core/appview/config"
22 "tangled.org/core/appview/db"
23 "tangled.org/core/appview/models"
24 "tangled.org/core/appview/serververify"
25 "tangled.org/core/appview/validator"
26 "tangled.org/core/idresolver"
27 "tangled.org/core/orm"
28 "tangled.org/core/rbac"
29)
30
31type Ingester struct {
32 Db db.DbWrapper
33 Enforcer *rbac.Enforcer
34 IdResolver *idresolver.Resolver
35 Config *config.Config
36 Logger *slog.Logger
37 Validator *validator.Validator
38}
39
40type processFunc func(ctx context.Context, e *jmodels.Event) error
41
42func (i *Ingester) Ingest() processFunc {
43 return func(ctx context.Context, e *jmodels.Event) error {
44 var err error
45
46 l := i.Logger.With("kind", e.Kind)
47 switch e.Kind {
48 case jmodels.EventKindAccount:
49 if !e.Account.Active && *e.Account.Status == "deactivated" {
50 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
51 }
52 case jmodels.EventKindIdentity:
53 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
54 case jmodels.EventKindCommit:
55 switch e.Commit.Collection {
56 case tangled.GraphFollowNSID:
57 err = i.ingestFollow(e)
58 case tangled.FeedStarNSID:
59 err = i.ingestStar(e)
60 case tangled.PublicKeyNSID:
61 err = i.ingestPublicKey(e)
62 case tangled.RepoArtifactNSID:
63 err = i.ingestArtifact(e)
64 case tangled.ActorProfileNSID:
65 err = i.ingestProfile(ctx, e)
66 case tangled.SpindleMemberNSID:
67 err = i.ingestSpindleMember(ctx, e)
68 case tangled.SpindleNSID:
69 err = i.ingestSpindle(ctx, e)
70 case tangled.KnotMemberNSID:
71 err = i.ingestKnotMember(e)
72 case tangled.KnotNSID:
73 err = i.ingestKnot(e)
74 case tangled.StringNSID:
75 err = i.ingestString(e)
76 case tangled.RepoIssueNSID:
77 err = i.ingestIssue(ctx, e)
78 case tangled.RepoIssueCommentNSID:
79 err = i.ingestIssueComment(e)
80 case tangled.LabelDefinitionNSID:
81 err = i.ingestLabelDefinition(e)
82 case tangled.LabelOpNSID:
83 err = i.ingestLabelOp(e)
84 }
85 l = i.Logger.With("nsid", e.Commit.Collection)
86 }
87
88 if err != nil {
89 l.Warn("failed to ingest record, skipping", "err", err)
90 }
91
92 lastTimeUs := e.TimeUS + 1
93 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
94 l.Error("failed to save cursor", "err", saveErr)
95 }
96
97 return nil
98 }
99}
100
101func (i *Ingester) ingestStar(e *jmodels.Event) error {
102 var err error
103 did := e.Did
104
105 l := i.Logger.With("handler", "ingestStar")
106 l = l.With("nsid", e.Commit.Collection)
107
108 switch e.Commit.Operation {
109 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
110 var subjectUri syntax.ATURI
111
112 raw := json.RawMessage(e.Commit.Record)
113 record := tangled.FeedStar{}
114 err := json.Unmarshal(raw, &record)
115 if err != nil {
116 l.Error("invalid record", "err", err)
117 return err
118 }
119
120 star := &models.Star{
121 Did: did,
122 Rkey: e.Commit.RKey,
123 }
124
125 switch {
126 case record.SubjectDid != nil:
127 repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid))
128 if repoErr == nil {
129 subjectUri = repo.RepoAt()
130 star.RepoAt = subjectUri
131 }
132 case record.Subject != nil:
133 subjectUri, err = syntax.ParseATURI(*record.Subject)
134 if err != nil {
135 l.Error("invalid record", "err", err)
136 return err
137 }
138 star.RepoAt = subjectUri
139 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String())
140 if repoErr == nil && repo.RepoDid != "" {
141 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil {
142 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
143 }
144 }
145 default:
146 l.Error("star record has neither subject nor subjectDid")
147 return fmt.Errorf("star record has neither subject nor subjectDid")
148 }
149 err = db.AddStar(i.Db, star)
150 case jmodels.CommitOperationDelete:
151 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
152 }
153
154 if err != nil {
155 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
156 }
157
158 return nil
159}
160
161func (i *Ingester) ingestFollow(e *jmodels.Event) error {
162 var err error
163 did := e.Did
164
165 l := i.Logger.With("handler", "ingestFollow")
166 l = l.With("nsid", e.Commit.Collection)
167
168 switch e.Commit.Operation {
169 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
170 raw := json.RawMessage(e.Commit.Record)
171 record := tangled.GraphFollow{}
172 err = json.Unmarshal(raw, &record)
173 if err != nil {
174 l.Error("invalid record", "err", err)
175 return err
176 }
177
178 err = db.AddFollow(i.Db, &models.Follow{
179 UserDid: did,
180 SubjectDid: record.Subject,
181 Rkey: e.Commit.RKey,
182 })
183 case jmodels.CommitOperationDelete:
184 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
185 }
186
187 if err != nil {
188 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
189 }
190
191 return nil
192}
193
194func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
195 did := e.Did
196 var err error
197
198 l := i.Logger.With("handler", "ingestPublicKey")
199 l = l.With("nsid", e.Commit.Collection)
200
201 switch e.Commit.Operation {
202 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
203 l.Debug("processing add of pubkey")
204 raw := json.RawMessage(e.Commit.Record)
205 record := tangled.PublicKey{}
206 err = json.Unmarshal(raw, &record)
207 if err != nil {
208 l.Error("invalid record", "err", err)
209 return err
210 }
211
212 name := record.Name
213 key := record.Key
214 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
215 case jmodels.CommitOperationDelete:
216 l.Debug("processing delete of pubkey")
217 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
218 }
219
220 if err != nil {
221 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
222 }
223
224 return nil
225}
226
227func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
228 did := e.Did
229 var err error
230
231 l := i.Logger.With("handler", "ingestArtifact")
232 l = l.With("nsid", e.Commit.Collection)
233
234 switch e.Commit.Operation {
235 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
236 raw := json.RawMessage(e.Commit.Record)
237 record := tangled.RepoArtifact{}
238 err = json.Unmarshal(raw, &record)
239 if err != nil {
240 l.Error("invalid record", "err", err)
241 return err
242 }
243
244 var repo *models.Repo
245 if record.RepoDid != nil && *record.RepoDid != "" {
246 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
247 if err != nil && !errors.Is(err, sql.ErrNoRows) {
248 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
249 }
250 }
251 if repo == nil && record.Repo != nil {
252 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
253 if parseErr != nil {
254 return parseErr
255 }
256 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
257 if err != nil {
258 return err
259 }
260 }
261 if repo == nil {
262 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
263 }
264
265 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push")
266 if err != nil || !ok {
267 return err
268 }
269
270 repoDid := repo.RepoDid
271 if repoDid == "" && record.RepoDid != nil {
272 repoDid = *record.RepoDid
273 }
274 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
275 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil {
276 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
277 }
278 }
279
280 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
281 if err != nil {
282 createdAt = time.Now()
283 }
284
285 artifact := models.Artifact{
286 Did: did,
287 Rkey: e.Commit.RKey,
288 RepoAt: repo.RepoAt(),
289 Tag: plumbing.Hash(record.Tag),
290 CreatedAt: createdAt,
291 BlobCid: cid.Cid(record.Artifact.Ref),
292 Name: record.Name,
293 Size: uint64(record.Artifact.Size),
294 MimeType: record.Artifact.MimeType,
295 }
296
297 err = db.AddArtifact(i.Db, artifact)
298 case jmodels.CommitOperationDelete:
299 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
300 }
301
302 if err != nil {
303 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
304 }
305
306 return nil
307}
308
309func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error {
310 did := e.Did
311 var err error
312
313 l := i.Logger.With("handler", "ingestProfile")
314 l = l.With("nsid", e.Commit.Collection)
315
316 if e.Commit.RKey != "self" {
317 return fmt.Errorf("ingestProfile only ingests `self` record")
318 }
319
320 switch e.Commit.Operation {
321 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
322 raw := json.RawMessage(e.Commit.Record)
323 record := tangled.ActorProfile{}
324 err = json.Unmarshal(raw, &record)
325 if err != nil {
326 l.Error("invalid record", "err", err)
327 return err
328 }
329
330 avatar := ""
331 if record.Avatar != nil {
332 avatar = record.Avatar.Ref.String()
333 }
334
335 description := ""
336 if record.Description != nil {
337 description = *record.Description
338 }
339
340 includeBluesky := record.Bluesky
341
342 pronouns := ""
343 if record.Pronouns != nil {
344 pronouns = *record.Pronouns
345 }
346
347 location := ""
348 if record.Location != nil {
349 location = *record.Location
350 }
351
352 var links [5]string
353 for i, l := range record.Links {
354 if i < 5 {
355 links[i] = l
356 }
357 }
358
359 var stats [2]models.VanityStat
360 for i, s := range record.Stats {
361 if i < 2 {
362 stats[i].Kind = models.ParseVanityStatKind(s)
363 }
364 }
365
366 var pinned [6]string
367 for i, r := range record.PinnedRepositories {
368 if i < 6 {
369 pinned[i] = r
370 }
371 }
372
373 var preferredHandle syntax.Handle
374 if record.PreferredHandle != nil {
375 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
376 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
377 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
378 preferredHandle = h
379 }
380 }
381 }
382
383 profile := models.Profile{
384 Did: did,
385 Avatar: avatar,
386 Description: description,
387 IncludeBluesky: includeBluesky,
388 Location: location,
389 Links: links,
390 Stats: stats,
391 PinnedRepos: pinned,
392 Pronouns: pronouns,
393 PreferredHandle: preferredHandle,
394 }
395
396 ddb, ok := i.Db.Execer.(*db.DB)
397 if !ok {
398 return fmt.Errorf("failed to index profile record, invalid db cast")
399 }
400
401 tx, err := ddb.Begin()
402 if err != nil {
403 return fmt.Errorf("failed to start transaction")
404 }
405
406 err = db.ValidateProfile(tx, &profile)
407 if err != nil {
408 return fmt.Errorf("invalid profile record")
409 }
410
411 err = db.UpsertProfile(tx, &profile)
412 case jmodels.CommitOperationDelete:
413 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
414 }
415
416 if err != nil {
417 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
418 }
419
420 return nil
421}
422
423func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
424 did := e.Did
425 var err error
426
427 l := i.Logger.With("handler", "ingestSpindleMember")
428 l = l.With("nsid", e.Commit.Collection)
429
430 switch e.Commit.Operation {
431 case jmodels.CommitOperationCreate:
432 raw := json.RawMessage(e.Commit.Record)
433 record := tangled.SpindleMember{}
434 err = json.Unmarshal(raw, &record)
435 if err != nil {
436 l.Error("invalid record", "err", err)
437 return err
438 }
439
440 // only spindle owner can invite to spindles
441 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
442 if err != nil || !ok {
443 return fmt.Errorf("failed to enforce permissions: %w", err)
444 }
445
446 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
447 if err != nil {
448 return err
449 }
450
451 if memberId.Handle.IsInvalidHandle() {
452 return err
453 }
454
455 ddb, ok := i.Db.Execer.(*db.DB)
456 if !ok {
457 return fmt.Errorf("invalid db cast")
458 }
459
460 err = db.AddSpindleMember(ddb, models.SpindleMember{
461 Did: syntax.DID(did),
462 Rkey: e.Commit.RKey,
463 Instance: record.Instance,
464 Subject: memberId.DID,
465 })
466 if !ok {
467 return fmt.Errorf("failed to add to db: %w", err)
468 }
469
470 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
471 if err != nil {
472 return fmt.Errorf("failed to update ACLs: %w", err)
473 }
474
475 l.Info("added spindle member")
476 case jmodels.CommitOperationDelete:
477 rkey := e.Commit.RKey
478
479 ddb, ok := i.Db.Execer.(*db.DB)
480 if !ok {
481 return fmt.Errorf("failed to index profile record, invalid db cast")
482 }
483
484 // get record from db first
485 members, err := db.GetSpindleMembers(
486 ddb,
487 orm.FilterEq("did", did),
488 orm.FilterEq("rkey", rkey),
489 )
490 if err != nil || len(members) != 1 {
491 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
492 }
493 member := members[0]
494
495 tx, err := ddb.Begin()
496 if err != nil {
497 return fmt.Errorf("failed to start txn: %w", err)
498 }
499
500 // remove record by rkey && update enforcer
501 if err = db.RemoveSpindleMember(
502 tx,
503 orm.FilterEq("did", did),
504 orm.FilterEq("rkey", rkey),
505 ); err != nil {
506 return fmt.Errorf("failed to remove from db: %w", err)
507 }
508
509 // update enforcer
510 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
511 if err != nil {
512 return fmt.Errorf("failed to update ACLs: %w", err)
513 }
514
515 if err = tx.Commit(); err != nil {
516 return fmt.Errorf("failed to commit txn: %w", err)
517 }
518
519 if err = i.Enforcer.E.SavePolicy(); err != nil {
520 return fmt.Errorf("failed to save ACLs: %w", err)
521 }
522
523 l.Info("removed spindle member")
524 }
525
526 return nil
527}
528
529func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
530 did := e.Did
531 var err error
532
533 l := i.Logger.With("handler", "ingestSpindle")
534 l = l.With("nsid", e.Commit.Collection)
535
536 switch e.Commit.Operation {
537 case jmodels.CommitOperationCreate:
538 raw := json.RawMessage(e.Commit.Record)
539 record := tangled.Spindle{}
540 err = json.Unmarshal(raw, &record)
541 if err != nil {
542 l.Error("invalid record", "err", err)
543 return err
544 }
545
546 instance := e.Commit.RKey
547
548 ddb, ok := i.Db.Execer.(*db.DB)
549 if !ok {
550 return fmt.Errorf("failed to index profile record, invalid db cast")
551 }
552
553 err := db.AddSpindle(ddb, models.Spindle{
554 Owner: syntax.DID(did),
555 Instance: instance,
556 })
557 if err != nil {
558 l.Error("failed to add spindle to db", "err", err, "instance", instance)
559 return err
560 }
561
562 err = retry.Do(
563 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
564 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second),
565 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true),
566 )
567 if err != nil {
568 l.Error("failed to verify spindle after retries", "err", err, "instance", instance)
569 return err
570 }
571
572 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
573 if err != nil {
574 return fmt.Errorf("failed to mark verified: %w", err)
575 }
576
577 return nil
578
579 case jmodels.CommitOperationDelete:
580 instance := e.Commit.RKey
581
582 ddb, ok := i.Db.Execer.(*db.DB)
583 if !ok {
584 return fmt.Errorf("failed to index profile record, invalid db cast")
585 }
586
587 // get record from db first
588 spindles, err := db.GetSpindles(
589 ctx,
590 ddb,
591 orm.FilterEq("owner", did),
592 orm.FilterEq("instance", instance),
593 )
594 if err != nil || len(spindles) != 1 {
595 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
596 }
597 spindle := spindles[0]
598
599 tx, err := ddb.Begin()
600 if err != nil {
601 return err
602 }
603 defer func() {
604 tx.Rollback()
605 i.Enforcer.E.LoadPolicy()
606 }()
607
608 // remove spindle members first
609 err = db.RemoveSpindleMember(
610 tx,
611 orm.FilterEq("owner", did),
612 orm.FilterEq("instance", instance),
613 )
614 if err != nil {
615 return err
616 }
617
618 err = db.DeleteSpindle(
619 tx,
620 orm.FilterEq("owner", did),
621 orm.FilterEq("instance", instance),
622 )
623 if err != nil {
624 return err
625 }
626
627 if spindle.Verified != nil {
628 err = i.Enforcer.RemoveSpindle(instance)
629 if err != nil {
630 return err
631 }
632 }
633
634 err = tx.Commit()
635 if err != nil {
636 return err
637 }
638
639 err = i.Enforcer.E.SavePolicy()
640 if err != nil {
641 return err
642 }
643 }
644
645 return nil
646}
647
648func (i *Ingester) ingestString(e *jmodels.Event) error {
649 did := e.Did
650 rkey := e.Commit.RKey
651
652 var err error
653
654 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
655 l.Info("ingesting record")
656
657 ddb, ok := i.Db.Execer.(*db.DB)
658 if !ok {
659 return fmt.Errorf("failed to index string record, invalid db cast")
660 }
661
662 switch e.Commit.Operation {
663 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
664 raw := json.RawMessage(e.Commit.Record)
665 record := tangled.String{}
666 err = json.Unmarshal(raw, &record)
667 if err != nil {
668 l.Error("invalid record", "err", err)
669 return err
670 }
671
672 string := models.StringFromRecord(did, rkey, record)
673
674 if err = i.Validator.ValidateString(&string); err != nil {
675 l.Error("invalid record", "err", err)
676 return err
677 }
678
679 if err = db.AddString(ddb, string); err != nil {
680 l.Error("failed to add string", "err", err)
681 return err
682 }
683
684 return nil
685
686 case jmodels.CommitOperationDelete:
687 if err := db.DeleteString(
688 ddb,
689 orm.FilterEq("did", did),
690 orm.FilterEq("rkey", rkey),
691 ); err != nil {
692 l.Error("failed to delete", "err", err)
693 return fmt.Errorf("failed to delete string record: %w", err)
694 }
695
696 return nil
697 }
698
699 return nil
700}
701
702func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
703 did := e.Did
704 var err error
705
706 l := i.Logger.With("handler", "ingestKnotMember")
707 l = l.With("nsid", e.Commit.Collection)
708
709 switch e.Commit.Operation {
710 case jmodels.CommitOperationCreate:
711 raw := json.RawMessage(e.Commit.Record)
712 record := tangled.KnotMember{}
713 err = json.Unmarshal(raw, &record)
714 if err != nil {
715 l.Error("invalid record", "err", err)
716 return err
717 }
718
719 // only knot owner can invite to knots
720 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
721 if err != nil || !ok {
722 return fmt.Errorf("failed to enforce permissions: %w", err)
723 }
724
725 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
726 if err != nil {
727 return err
728 }
729
730 if memberId.Handle.IsInvalidHandle() {
731 return err
732 }
733
734 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
735 if err != nil {
736 return fmt.Errorf("failed to update ACLs: %w", err)
737 }
738
739 l.Info("added knot member")
740 case jmodels.CommitOperationDelete:
741 // we don't store knot members in a table (like we do for spindle)
742 // and we can't remove this just yet. possibly fixed if we switch
743 // to either:
744 // 1. a knot_members table like with spindle and store the rkey
745 // 2. use the knot host as the rkey
746 //
747 // TODO: implement member deletion
748 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
749 }
750
751 return nil
752}
753
754func (i *Ingester) ingestKnot(e *jmodels.Event) error {
755 did := e.Did
756 var err error
757
758 l := i.Logger.With("handler", "ingestKnot")
759 l = l.With("nsid", e.Commit.Collection)
760
761 switch e.Commit.Operation {
762 case jmodels.CommitOperationCreate:
763 raw := json.RawMessage(e.Commit.Record)
764 record := tangled.Knot{}
765 err = json.Unmarshal(raw, &record)
766 if err != nil {
767 l.Error("invalid record", "err", err)
768 return err
769 }
770
771 domain := e.Commit.RKey
772
773 ddb, ok := i.Db.Execer.(*db.DB)
774 if !ok {
775 return fmt.Errorf("failed to index profile record, invalid db cast")
776 }
777
778 err := db.AddKnot(ddb, domain, did)
779 if err != nil {
780 l.Error("failed to add knot to db", "err", err, "domain", domain)
781 return err
782 }
783
784 err = retry.Do(
785 func() error {
786 return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
787 },
788 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second),
789 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true),
790 )
791 if err != nil {
792 l.Error("failed to verify knot after retries", "err", err, "domain", domain)
793 return err
794 }
795
796 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
797 if err != nil {
798 return fmt.Errorf("failed to mark verified: %w", err)
799 }
800
801 return nil
802
803 case jmodels.CommitOperationDelete:
804 domain := e.Commit.RKey
805
806 ddb, ok := i.Db.Execer.(*db.DB)
807 if !ok {
808 return fmt.Errorf("failed to index knot record, invalid db cast")
809 }
810
811 // get record from db first
812 registrations, err := db.GetRegistrations(
813 ddb,
814 orm.FilterEq("domain", domain),
815 orm.FilterEq("did", did),
816 )
817 if err != nil {
818 return fmt.Errorf("failed to get registration: %w", err)
819 }
820 if len(registrations) != 1 {
821 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
822 }
823 registration := registrations[0]
824
825 tx, err := ddb.Begin()
826 if err != nil {
827 return err
828 }
829 defer func() {
830 tx.Rollback()
831 i.Enforcer.E.LoadPolicy()
832 }()
833
834 err = db.DeleteKnot(
835 tx,
836 orm.FilterEq("did", did),
837 orm.FilterEq("domain", domain),
838 )
839 if err != nil {
840 return err
841 }
842
843 if registration.Registered != nil {
844 err = i.Enforcer.RemoveKnot(domain)
845 if err != nil {
846 return err
847 }
848 }
849
850 err = tx.Commit()
851 if err != nil {
852 return err
853 }
854
855 err = i.Enforcer.E.SavePolicy()
856 if err != nil {
857 return err
858 }
859 }
860
861 return nil
862}
863func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
864 did := e.Did
865 rkey := e.Commit.RKey
866
867 var err error
868
869 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
870 l.Info("ingesting record")
871
872 ddb, ok := i.Db.Execer.(*db.DB)
873 if !ok {
874 return fmt.Errorf("failed to index issue record, invalid db cast")
875 }
876
877 switch e.Commit.Operation {
878 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
879 raw := json.RawMessage(e.Commit.Record)
880 record := tangled.RepoIssue{}
881 err = json.Unmarshal(raw, &record)
882 if err != nil {
883 l.Error("invalid record", "err", err)
884 return err
885 }
886
887 issue := models.IssueFromRecord(did, rkey, record)
888
889 if issue.RepoAt == "" {
890 return fmt.Errorf("issue record has no repo field")
891 }
892
893 if err := i.Validator.ValidateIssue(&issue); err != nil {
894 return fmt.Errorf("failed to validate issue: %w", err)
895 }
896
897 if record.Repo != nil {
898 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo)
899 if repoErr == nil && repo.RepoDid != "" {
900 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil {
901 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
902 }
903 }
904 }
905
906 tx, err := ddb.BeginTx(ctx, nil)
907 if err != nil {
908 l.Error("failed to begin transaction", "err", err)
909 return err
910 }
911 defer tx.Rollback()
912
913 err = db.PutIssue(tx, &issue)
914 if err != nil {
915 l.Error("failed to create issue", "err", err)
916 return err
917 }
918
919 err = tx.Commit()
920 if err != nil {
921 l.Error("failed to commit txn", "err", err)
922 return err
923 }
924
925 return nil
926
927 case jmodels.CommitOperationDelete:
928 tx, err := ddb.BeginTx(ctx, nil)
929 if err != nil {
930 l.Error("failed to begin transaction", "err", err)
931 return err
932 }
933 defer tx.Rollback()
934
935 if err := db.DeleteIssues(
936 tx,
937 did,
938 rkey,
939 ); err != nil {
940 l.Error("failed to delete", "err", err)
941 return fmt.Errorf("failed to delete issue record: %w", err)
942 }
943 if err := tx.Commit(); err != nil {
944 l.Error("failed to commit txn", "err", err)
945 return err
946 }
947
948 return nil
949 }
950
951 return nil
952}
953
954func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
955 did := e.Did
956 rkey := e.Commit.RKey
957
958 var err error
959
960 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
961 l.Info("ingesting record")
962
963 ddb, ok := i.Db.Execer.(*db.DB)
964 if !ok {
965 return fmt.Errorf("failed to index issue comment record, invalid db cast")
966 }
967
968 switch e.Commit.Operation {
969 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
970 raw := json.RawMessage(e.Commit.Record)
971 record := tangled.RepoIssueComment{}
972 err = json.Unmarshal(raw, &record)
973 if err != nil {
974 return fmt.Errorf("invalid record: %w", err)
975 }
976
977 comment, err := models.IssueCommentFromRecord(did, rkey, record)
978 if err != nil {
979 return fmt.Errorf("failed to parse comment from record: %w", err)
980 }
981
982 if err := i.Validator.ValidateIssueComment(comment); err != nil {
983 return fmt.Errorf("failed to validate comment: %w", err)
984 }
985
986 tx, err := ddb.Begin()
987 if err != nil {
988 return fmt.Errorf("failed to start transaction: %w", err)
989 }
990 defer tx.Rollback()
991
992 _, err = db.AddIssueComment(tx, *comment)
993 if err != nil {
994 return fmt.Errorf("failed to create issue comment: %w", err)
995 }
996
997 return tx.Commit()
998
999 case jmodels.CommitOperationDelete:
1000 if err := db.DeleteIssueComments(
1001 ddb,
1002 orm.FilterEq("did", did),
1003 orm.FilterEq("rkey", rkey),
1004 ); err != nil {
1005 return fmt.Errorf("failed to delete issue comment record: %w", err)
1006 }
1007
1008 return nil
1009 }
1010
1011 return nil
1012}
1013
1014func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
1015 did := e.Did
1016 rkey := e.Commit.RKey
1017
1018 var err error
1019
1020 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1021 l.Info("ingesting record")
1022
1023 ddb, ok := i.Db.Execer.(*db.DB)
1024 if !ok {
1025 return fmt.Errorf("failed to index label definition, invalid db cast")
1026 }
1027
1028 switch e.Commit.Operation {
1029 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1030 raw := json.RawMessage(e.Commit.Record)
1031 record := tangled.LabelDefinition{}
1032 err = json.Unmarshal(raw, &record)
1033 if err != nil {
1034 return fmt.Errorf("invalid record: %w", err)
1035 }
1036
1037 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1038 if err != nil {
1039 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1040 }
1041
1042 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1043 return fmt.Errorf("failed to validate labeldef: %w", err)
1044 }
1045
1046 _, err = db.AddLabelDefinition(ddb, def)
1047 if err != nil {
1048 return fmt.Errorf("failed to create labeldef: %w", err)
1049 }
1050
1051 return nil
1052
1053 case jmodels.CommitOperationDelete:
1054 if err := db.DeleteLabelDefinition(
1055 ddb,
1056 orm.FilterEq("did", did),
1057 orm.FilterEq("rkey", rkey),
1058 ); err != nil {
1059 return fmt.Errorf("failed to delete labeldef record: %w", err)
1060 }
1061
1062 return nil
1063 }
1064
1065 return nil
1066}
1067
1068func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1069 did := e.Did
1070 rkey := e.Commit.RKey
1071
1072 var err error
1073
1074 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1075 l.Info("ingesting record")
1076
1077 ddb, ok := i.Db.Execer.(*db.DB)
1078 if !ok {
1079 return fmt.Errorf("failed to index label op, invalid db cast")
1080 }
1081
1082 switch e.Commit.Operation {
1083 case jmodels.CommitOperationCreate:
1084 raw := json.RawMessage(e.Commit.Record)
1085 record := tangled.LabelOp{}
1086 err = json.Unmarshal(raw, &record)
1087 if err != nil {
1088 return fmt.Errorf("invalid record: %w", err)
1089 }
1090
1091 subject := syntax.ATURI(record.Subject)
1092 collection := subject.Collection()
1093
1094 var repo *models.Repo
1095 switch collection {
1096 case tangled.RepoIssueNSID:
1097 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1098 if err != nil || len(i) != 1 {
1099 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1100 }
1101 repo = i[0].Repo
1102 default:
1103 return fmt.Errorf("unsupport label subject: %s", collection)
1104 }
1105
1106 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1107 if err != nil {
1108 return fmt.Errorf("failed to build label application ctx: %w", err)
1109 }
1110
1111 ops := models.LabelOpsFromRecord(did, rkey, record)
1112
1113 for _, o := range ops {
1114 def, ok := actx.Defs[o.OperandKey]
1115 if !ok {
1116 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1117 }
1118 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1119 return fmt.Errorf("failed to validate labelop: %w", err)
1120 }
1121 }
1122
1123 tx, err := ddb.Begin()
1124 if err != nil {
1125 return err
1126 }
1127 defer tx.Rollback()
1128
1129 for _, o := range ops {
1130 _, err = db.AddLabelOp(tx, &o)
1131 if err != nil {
1132 return fmt.Errorf("failed to add labelop: %w", err)
1133 }
1134 }
1135
1136 if err = tx.Commit(); err != nil {
1137 return err
1138 }
1139 }
1140
1141 return nil
1142}