forked from
tangled.org/core
Monorepo for Tangled
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "log/slog"
10 "maps"
11 "slices"
12
13 "time"
14
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 jmodels "github.com/bluesky-social/jetstream/pkg/models"
17 "github.com/go-git/go-git/v5/plumbing"
18 "github.com/ipfs/go-cid"
19 "tangled.org/core/api/tangled"
20 "tangled.org/core/appview/config"
21 "tangled.org/core/appview/db"
22 "tangled.org/core/appview/models"
23 "tangled.org/core/appview/serververify"
24 "tangled.org/core/appview/validator"
25 "tangled.org/core/idresolver"
26 "tangled.org/core/orm"
27 "tangled.org/core/rbac"
28)
29
30type Ingester struct {
31 Db db.DbWrapper
32 Enforcer *rbac.Enforcer
33 IdResolver *idresolver.Resolver
34 Config *config.Config
35 Logger *slog.Logger
36 Validator *validator.Validator
37}
38
39type processFunc func(ctx context.Context, e *jmodels.Event) error
40
41func (i *Ingester) Ingest() processFunc {
42 return func(ctx context.Context, e *jmodels.Event) error {
43 var err error
44 defer func() {
45 eventTime := e.TimeUS
46 lastTimeUs := eventTime + 1
47 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
48 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
49 }
50 }()
51
52 l := i.Logger.With("kind", e.Kind)
53 switch e.Kind {
54 case jmodels.EventKindAccount:
55 if !e.Account.Active && *e.Account.Status == "deactivated" {
56 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
57 }
58 case jmodels.EventKindIdentity:
59 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
60 case jmodels.EventKindCommit:
61 switch e.Commit.Collection {
62 case tangled.GraphFollowNSID:
63 err = i.ingestFollow(e)
64 case tangled.FeedStarNSID:
65 err = i.ingestStar(e)
66 case tangled.PublicKeyNSID:
67 err = i.ingestPublicKey(e)
68 case tangled.RepoArtifactNSID:
69 err = i.ingestArtifact(e)
70 case tangled.ActorProfileNSID:
71 err = i.ingestProfile(ctx, e)
72 case tangled.SpindleMemberNSID:
73 err = i.ingestSpindleMember(ctx, e)
74 case tangled.SpindleNSID:
75 err = i.ingestSpindle(ctx, e)
76 case tangled.KnotMemberNSID:
77 err = i.ingestKnotMember(e)
78 case tangled.KnotNSID:
79 err = i.ingestKnot(e)
80 case tangled.StringNSID:
81 err = i.ingestString(e)
82 case tangled.RepoIssueNSID:
83 err = i.ingestIssue(ctx, e)
84 case tangled.RepoIssueCommentNSID:
85 err = i.ingestIssueComment(e)
86 case tangled.LabelDefinitionNSID:
87 err = i.ingestLabelDefinition(e)
88 case tangled.LabelOpNSID:
89 err = i.ingestLabelOp(e)
90 }
91 l = i.Logger.With("nsid", e.Commit.Collection)
92 }
93
94 if err != nil {
95 l.Warn("refused to ingest record", "err", err)
96 }
97
98 return nil
99 }
100}
101
102func (i *Ingester) ingestStar(e *jmodels.Event) error {
103 var err error
104 did := e.Did
105
106 l := i.Logger.With("handler", "ingestStar")
107 l = l.With("nsid", e.Commit.Collection)
108
109 switch e.Commit.Operation {
110 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
111 var subjectUri syntax.ATURI
112
113 raw := json.RawMessage(e.Commit.Record)
114 record := tangled.FeedStar{}
115 err := json.Unmarshal(raw, &record)
116 if err != nil {
117 l.Error("invalid record", "err", err)
118 return err
119 }
120
121 star := &models.Star{
122 Did: did,
123 Rkey: e.Commit.RKey,
124 }
125
126 switch {
127 case record.SubjectDid != nil:
128 repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid))
129 if repoErr == nil {
130 subjectUri = repo.RepoAt()
131 star.RepoAt = subjectUri
132 }
133 case record.Subject != nil:
134 subjectUri, err = syntax.ParseATURI(*record.Subject)
135 if err != nil {
136 l.Error("invalid record", "err", err)
137 return err
138 }
139 star.RepoAt = subjectUri
140 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String())
141 if repoErr == nil && repo.RepoDid != "" {
142 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil {
143 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
144 }
145 }
146 default:
147 l.Error("star record has neither subject nor subjectDid")
148 return fmt.Errorf("star record has neither subject nor subjectDid")
149 }
150 err = db.AddStar(i.Db, star)
151 case jmodels.CommitOperationDelete:
152 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
153 }
154
155 if err != nil {
156 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
157 }
158
159 return nil
160}
161
162func (i *Ingester) ingestFollow(e *jmodels.Event) error {
163 var err error
164 did := e.Did
165
166 l := i.Logger.With("handler", "ingestFollow")
167 l = l.With("nsid", e.Commit.Collection)
168
169 switch e.Commit.Operation {
170 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
171 raw := json.RawMessage(e.Commit.Record)
172 record := tangled.GraphFollow{}
173 err = json.Unmarshal(raw, &record)
174 if err != nil {
175 l.Error("invalid record", "err", err)
176 return err
177 }
178
179 err = db.AddFollow(i.Db, &models.Follow{
180 UserDid: did,
181 SubjectDid: record.Subject,
182 Rkey: e.Commit.RKey,
183 })
184 case jmodels.CommitOperationDelete:
185 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
186 }
187
188 if err != nil {
189 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
190 }
191
192 return nil
193}
194
195func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
196 did := e.Did
197 var err error
198
199 l := i.Logger.With("handler", "ingestPublicKey")
200 l = l.With("nsid", e.Commit.Collection)
201
202 switch e.Commit.Operation {
203 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
204 l.Debug("processing add of pubkey")
205 raw := json.RawMessage(e.Commit.Record)
206 record := tangled.PublicKey{}
207 err = json.Unmarshal(raw, &record)
208 if err != nil {
209 l.Error("invalid record", "err", err)
210 return err
211 }
212
213 name := record.Name
214 key := record.Key
215 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
216 case jmodels.CommitOperationDelete:
217 l.Debug("processing delete of pubkey")
218 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
219 }
220
221 if err != nil {
222 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
223 }
224
225 return nil
226}
227
228func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
229 did := e.Did
230 var err error
231
232 l := i.Logger.With("handler", "ingestArtifact")
233 l = l.With("nsid", e.Commit.Collection)
234
235 switch e.Commit.Operation {
236 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
237 raw := json.RawMessage(e.Commit.Record)
238 record := tangled.RepoArtifact{}
239 err = json.Unmarshal(raw, &record)
240 if err != nil {
241 l.Error("invalid record", "err", err)
242 return err
243 }
244
245 var repo *models.Repo
246 if record.RepoDid != nil && *record.RepoDid != "" {
247 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
248 if err != nil && !errors.Is(err, sql.ErrNoRows) {
249 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
250 }
251 }
252 if repo == nil && record.Repo != nil {
253 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
254 if parseErr != nil {
255 return parseErr
256 }
257 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
258 if err != nil {
259 return err
260 }
261 }
262 if repo == nil {
263 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
264 }
265
266 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push")
267 if err != nil || !ok {
268 return err
269 }
270
271 repoDid := repo.RepoDid
272 if repoDid == "" && record.RepoDid != nil {
273 repoDid = *record.RepoDid
274 }
275 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
276 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil {
277 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
278 }
279 }
280
281 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
282 if err != nil {
283 createdAt = time.Now()
284 }
285
286 artifact := models.Artifact{
287 Did: did,
288 Rkey: e.Commit.RKey,
289 RepoAt: repo.RepoAt(),
290 Tag: plumbing.Hash(record.Tag),
291 CreatedAt: createdAt,
292 BlobCid: cid.Cid(record.Artifact.Ref),
293 Name: record.Name,
294 Size: uint64(record.Artifact.Size),
295 MimeType: record.Artifact.MimeType,
296 }
297
298 err = db.AddArtifact(i.Db, artifact)
299 case jmodels.CommitOperationDelete:
300 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
301 }
302
303 if err != nil {
304 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
305 }
306
307 return nil
308}
309
310func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error {
311 did := e.Did
312 var err error
313
314 l := i.Logger.With("handler", "ingestProfile")
315 l = l.With("nsid", e.Commit.Collection)
316
317 if e.Commit.RKey != "self" {
318 return fmt.Errorf("ingestProfile only ingests `self` record")
319 }
320
321 switch e.Commit.Operation {
322 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
323 raw := json.RawMessage(e.Commit.Record)
324 record := tangled.ActorProfile{}
325 err = json.Unmarshal(raw, &record)
326 if err != nil {
327 l.Error("invalid record", "err", err)
328 return err
329 }
330
331 avatar := ""
332 if record.Avatar != nil {
333 avatar = record.Avatar.Ref.String()
334 }
335
336 description := ""
337 if record.Description != nil {
338 description = *record.Description
339 }
340
341 includeBluesky := record.Bluesky
342
343 pronouns := ""
344 if record.Pronouns != nil {
345 pronouns = *record.Pronouns
346 }
347
348 location := ""
349 if record.Location != nil {
350 location = *record.Location
351 }
352
353 var links [5]string
354 for i, l := range record.Links {
355 if i < 5 {
356 links[i] = l
357 }
358 }
359
360 var stats [2]models.VanityStat
361 for i, s := range record.Stats {
362 if i < 2 {
363 stats[i].Kind = models.ParseVanityStatKind(s)
364 }
365 }
366
367 var pinned [6]string
368 for i, r := range record.PinnedRepositories {
369 if i < 6 {
370 pinned[i] = r
371 }
372 }
373
374 var preferredHandle syntax.Handle
375 if record.PreferredHandle != nil {
376 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
377 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
378 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
379 preferredHandle = h
380 }
381 }
382 }
383
384 profile := models.Profile{
385 Did: did,
386 Avatar: avatar,
387 Description: description,
388 IncludeBluesky: includeBluesky,
389 Location: location,
390 Links: links,
391 Stats: stats,
392 PinnedRepos: pinned,
393 Pronouns: pronouns,
394 PreferredHandle: preferredHandle,
395 }
396
397 ddb, ok := i.Db.Execer.(*db.DB)
398 if !ok {
399 return fmt.Errorf("failed to index profile record, invalid db cast")
400 }
401
402 tx, err := ddb.Begin()
403 if err != nil {
404 return fmt.Errorf("failed to start transaction")
405 }
406
407 err = db.ValidateProfile(tx, &profile)
408 if err != nil {
409 return fmt.Errorf("invalid profile record")
410 }
411
412 err = db.UpsertProfile(tx, &profile)
413 case jmodels.CommitOperationDelete:
414 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
415 }
416
417 if err != nil {
418 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
419 }
420
421 return nil
422}
423
424func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
425 did := e.Did
426 var err error
427
428 l := i.Logger.With("handler", "ingestSpindleMember")
429 l = l.With("nsid", e.Commit.Collection)
430
431 switch e.Commit.Operation {
432 case jmodels.CommitOperationCreate:
433 raw := json.RawMessage(e.Commit.Record)
434 record := tangled.SpindleMember{}
435 err = json.Unmarshal(raw, &record)
436 if err != nil {
437 l.Error("invalid record", "err", err)
438 return err
439 }
440
441 // only spindle owner can invite to spindles
442 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
443 if err != nil || !ok {
444 return fmt.Errorf("failed to enforce permissions: %w", err)
445 }
446
447 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
448 if err != nil {
449 return err
450 }
451
452 if memberId.Handle.IsInvalidHandle() {
453 return err
454 }
455
456 ddb, ok := i.Db.Execer.(*db.DB)
457 if !ok {
458 return fmt.Errorf("invalid db cast")
459 }
460
461 err = db.AddSpindleMember(ddb, models.SpindleMember{
462 Did: syntax.DID(did),
463 Rkey: e.Commit.RKey,
464 Instance: record.Instance,
465 Subject: memberId.DID,
466 })
467 if !ok {
468 return fmt.Errorf("failed to add to db: %w", err)
469 }
470
471 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
472 if err != nil {
473 return fmt.Errorf("failed to update ACLs: %w", err)
474 }
475
476 l.Info("added spindle member")
477 case jmodels.CommitOperationDelete:
478 rkey := e.Commit.RKey
479
480 ddb, ok := i.Db.Execer.(*db.DB)
481 if !ok {
482 return fmt.Errorf("failed to index profile record, invalid db cast")
483 }
484
485 // get record from db first
486 members, err := db.GetSpindleMembers(
487 ddb,
488 orm.FilterEq("did", did),
489 orm.FilterEq("rkey", rkey),
490 )
491 if err != nil || len(members) != 1 {
492 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
493 }
494 member := members[0]
495
496 tx, err := ddb.Begin()
497 if err != nil {
498 return fmt.Errorf("failed to start txn: %w", err)
499 }
500
501 // remove record by rkey && update enforcer
502 if err = db.RemoveSpindleMember(
503 tx,
504 orm.FilterEq("did", did),
505 orm.FilterEq("rkey", rkey),
506 ); err != nil {
507 return fmt.Errorf("failed to remove from db: %w", err)
508 }
509
510 // update enforcer
511 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
512 if err != nil {
513 return fmt.Errorf("failed to update ACLs: %w", err)
514 }
515
516 if err = tx.Commit(); err != nil {
517 return fmt.Errorf("failed to commit txn: %w", err)
518 }
519
520 if err = i.Enforcer.E.SavePolicy(); err != nil {
521 return fmt.Errorf("failed to save ACLs: %w", err)
522 }
523
524 l.Info("removed spindle member")
525 }
526
527 return nil
528}
529
530func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
531 did := e.Did
532 var err error
533
534 l := i.Logger.With("handler", "ingestSpindle")
535 l = l.With("nsid", e.Commit.Collection)
536
537 switch e.Commit.Operation {
538 case jmodels.CommitOperationCreate:
539 raw := json.RawMessage(e.Commit.Record)
540 record := tangled.Spindle{}
541 err = json.Unmarshal(raw, &record)
542 if err != nil {
543 l.Error("invalid record", "err", err)
544 return err
545 }
546
547 instance := e.Commit.RKey
548
549 ddb, ok := i.Db.Execer.(*db.DB)
550 if !ok {
551 return fmt.Errorf("failed to index profile record, invalid db cast")
552 }
553
554 err := db.AddSpindle(ddb, models.Spindle{
555 Owner: syntax.DID(did),
556 Instance: instance,
557 })
558 if err != nil {
559 l.Error("failed to add spindle to db", "err", err, "instance", instance)
560 return err
561 }
562
563 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev)
564 if err != nil {
565 l.Error("failed to add spindle to db", "err", err, "instance", instance)
566 return err
567 }
568
569 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
570 if err != nil {
571 return fmt.Errorf("failed to mark verified: %w", err)
572 }
573
574 return nil
575
576 case jmodels.CommitOperationDelete:
577 instance := e.Commit.RKey
578
579 ddb, ok := i.Db.Execer.(*db.DB)
580 if !ok {
581 return fmt.Errorf("failed to index profile record, invalid db cast")
582 }
583
584 // get record from db first
585 spindles, err := db.GetSpindles(
586 ctx,
587 ddb,
588 orm.FilterEq("owner", did),
589 orm.FilterEq("instance", instance),
590 )
591 if err != nil || len(spindles) != 1 {
592 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
593 }
594 spindle := spindles[0]
595
596 tx, err := ddb.Begin()
597 if err != nil {
598 return err
599 }
600 defer func() {
601 tx.Rollback()
602 i.Enforcer.E.LoadPolicy()
603 }()
604
605 // remove spindle members first
606 err = db.RemoveSpindleMember(
607 tx,
608 orm.FilterEq("owner", did),
609 orm.FilterEq("instance", instance),
610 )
611 if err != nil {
612 return err
613 }
614
615 err = db.DeleteSpindle(
616 tx,
617 orm.FilterEq("owner", did),
618 orm.FilterEq("instance", instance),
619 )
620 if err != nil {
621 return err
622 }
623
624 if spindle.Verified != nil {
625 err = i.Enforcer.RemoveSpindle(instance)
626 if err != nil {
627 return err
628 }
629 }
630
631 err = tx.Commit()
632 if err != nil {
633 return err
634 }
635
636 err = i.Enforcer.E.SavePolicy()
637 if err != nil {
638 return err
639 }
640 }
641
642 return nil
643}
644
645func (i *Ingester) ingestString(e *jmodels.Event) error {
646 did := e.Did
647 rkey := e.Commit.RKey
648
649 var err error
650
651 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
652 l.Info("ingesting record")
653
654 ddb, ok := i.Db.Execer.(*db.DB)
655 if !ok {
656 return fmt.Errorf("failed to index string record, invalid db cast")
657 }
658
659 switch e.Commit.Operation {
660 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
661 raw := json.RawMessage(e.Commit.Record)
662 record := tangled.String{}
663 err = json.Unmarshal(raw, &record)
664 if err != nil {
665 l.Error("invalid record", "err", err)
666 return err
667 }
668
669 string := models.StringFromRecord(did, rkey, record)
670
671 if err = i.Validator.ValidateString(&string); err != nil {
672 l.Error("invalid record", "err", err)
673 return err
674 }
675
676 if err = db.AddString(ddb, string); err != nil {
677 l.Error("failed to add string", "err", err)
678 return err
679 }
680
681 return nil
682
683 case jmodels.CommitOperationDelete:
684 if err := db.DeleteString(
685 ddb,
686 orm.FilterEq("did", did),
687 orm.FilterEq("rkey", rkey),
688 ); err != nil {
689 l.Error("failed to delete", "err", err)
690 return fmt.Errorf("failed to delete string record: %w", err)
691 }
692
693 return nil
694 }
695
696 return nil
697}
698
699func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
700 did := e.Did
701 var err error
702
703 l := i.Logger.With("handler", "ingestKnotMember")
704 l = l.With("nsid", e.Commit.Collection)
705
706 switch e.Commit.Operation {
707 case jmodels.CommitOperationCreate:
708 raw := json.RawMessage(e.Commit.Record)
709 record := tangled.KnotMember{}
710 err = json.Unmarshal(raw, &record)
711 if err != nil {
712 l.Error("invalid record", "err", err)
713 return err
714 }
715
716 // only knot owner can invite to knots
717 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
718 if err != nil || !ok {
719 return fmt.Errorf("failed to enforce permissions: %w", err)
720 }
721
722 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
723 if err != nil {
724 return err
725 }
726
727 if memberId.Handle.IsInvalidHandle() {
728 return err
729 }
730
731 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
732 if err != nil {
733 return fmt.Errorf("failed to update ACLs: %w", err)
734 }
735
736 l.Info("added knot member")
737 case jmodels.CommitOperationDelete:
738 // we don't store knot members in a table (like we do for spindle)
739 // and we can't remove this just yet. possibly fixed if we switch
740 // to either:
741 // 1. a knot_members table like with spindle and store the rkey
742 // 2. use the knot host as the rkey
743 //
744 // TODO: implement member deletion
745 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
746 }
747
748 return nil
749}
750
751func (i *Ingester) ingestKnot(e *jmodels.Event) error {
752 did := e.Did
753 var err error
754
755 l := i.Logger.With("handler", "ingestKnot")
756 l = l.With("nsid", e.Commit.Collection)
757
758 switch e.Commit.Operation {
759 case jmodels.CommitOperationCreate:
760 raw := json.RawMessage(e.Commit.Record)
761 record := tangled.Knot{}
762 err = json.Unmarshal(raw, &record)
763 if err != nil {
764 l.Error("invalid record", "err", err)
765 return err
766 }
767
768 domain := e.Commit.RKey
769
770 ddb, ok := i.Db.Execer.(*db.DB)
771 if !ok {
772 return fmt.Errorf("failed to index profile record, invalid db cast")
773 }
774
775 err := db.AddKnot(ddb, domain, did)
776 if err != nil {
777 l.Error("failed to add knot to db", "err", err, "domain", domain)
778 return err
779 }
780
781 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
782 if err != nil {
783 l.Error("failed to verify knot", "err", err, "domain", domain)
784 return err
785 }
786
787 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
788 if err != nil {
789 return fmt.Errorf("failed to mark verified: %w", err)
790 }
791
792 return nil
793
794 case jmodels.CommitOperationDelete:
795 domain := e.Commit.RKey
796
797 ddb, ok := i.Db.Execer.(*db.DB)
798 if !ok {
799 return fmt.Errorf("failed to index knot record, invalid db cast")
800 }
801
802 // get record from db first
803 registrations, err := db.GetRegistrations(
804 ddb,
805 orm.FilterEq("domain", domain),
806 orm.FilterEq("did", did),
807 )
808 if err != nil {
809 return fmt.Errorf("failed to get registration: %w", err)
810 }
811 if len(registrations) != 1 {
812 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
813 }
814 registration := registrations[0]
815
816 tx, err := ddb.Begin()
817 if err != nil {
818 return err
819 }
820 defer func() {
821 tx.Rollback()
822 i.Enforcer.E.LoadPolicy()
823 }()
824
825 err = db.DeleteKnot(
826 tx,
827 orm.FilterEq("did", did),
828 orm.FilterEq("domain", domain),
829 )
830 if err != nil {
831 return err
832 }
833
834 if registration.Registered != nil {
835 err = i.Enforcer.RemoveKnot(domain)
836 if err != nil {
837 return err
838 }
839 }
840
841 err = tx.Commit()
842 if err != nil {
843 return err
844 }
845
846 err = i.Enforcer.E.SavePolicy()
847 if err != nil {
848 return err
849 }
850 }
851
852 return nil
853}
854func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
855 did := e.Did
856 rkey := e.Commit.RKey
857
858 var err error
859
860 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
861 l.Info("ingesting record")
862
863 ddb, ok := i.Db.Execer.(*db.DB)
864 if !ok {
865 return fmt.Errorf("failed to index issue record, invalid db cast")
866 }
867
868 switch e.Commit.Operation {
869 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
870 raw := json.RawMessage(e.Commit.Record)
871 record := tangled.RepoIssue{}
872 err = json.Unmarshal(raw, &record)
873 if err != nil {
874 l.Error("invalid record", "err", err)
875 return err
876 }
877
878 issue := models.IssueFromRecord(did, rkey, record)
879
880 if issue.RepoAt == "" {
881 return fmt.Errorf("issue record has no repo field")
882 }
883
884 if err := i.Validator.ValidateIssue(&issue); err != nil {
885 return fmt.Errorf("failed to validate issue: %w", err)
886 }
887
888 if record.Repo != nil {
889 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo)
890 if repoErr == nil && repo.RepoDid != "" {
891 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil {
892 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
893 }
894 }
895 }
896
897 tx, err := ddb.BeginTx(ctx, nil)
898 if err != nil {
899 l.Error("failed to begin transaction", "err", err)
900 return err
901 }
902 defer tx.Rollback()
903
904 err = db.PutIssue(tx, &issue)
905 if err != nil {
906 l.Error("failed to create issue", "err", err)
907 return err
908 }
909
910 err = tx.Commit()
911 if err != nil {
912 l.Error("failed to commit txn", "err", err)
913 return err
914 }
915
916 return nil
917
918 case jmodels.CommitOperationDelete:
919 tx, err := ddb.BeginTx(ctx, nil)
920 if err != nil {
921 l.Error("failed to begin transaction", "err", err)
922 return err
923 }
924 defer tx.Rollback()
925
926 if err := db.DeleteIssues(
927 tx,
928 did,
929 rkey,
930 ); err != nil {
931 l.Error("failed to delete", "err", err)
932 return fmt.Errorf("failed to delete issue record: %w", err)
933 }
934 if err := tx.Commit(); err != nil {
935 l.Error("failed to commit txn", "err", err)
936 return err
937 }
938
939 return nil
940 }
941
942 return nil
943}
944
945func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
946 did := e.Did
947 rkey := e.Commit.RKey
948
949 var err error
950
951 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
952 l.Info("ingesting record")
953
954 ddb, ok := i.Db.Execer.(*db.DB)
955 if !ok {
956 return fmt.Errorf("failed to index issue comment record, invalid db cast")
957 }
958
959 switch e.Commit.Operation {
960 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
961 raw := json.RawMessage(e.Commit.Record)
962 record := tangled.RepoIssueComment{}
963 err = json.Unmarshal(raw, &record)
964 if err != nil {
965 return fmt.Errorf("invalid record: %w", err)
966 }
967
968 comment, err := models.IssueCommentFromRecord(did, rkey, record)
969 if err != nil {
970 return fmt.Errorf("failed to parse comment from record: %w", err)
971 }
972
973 if err := i.Validator.ValidateIssueComment(comment); err != nil {
974 return fmt.Errorf("failed to validate comment: %w", err)
975 }
976
977 tx, err := ddb.Begin()
978 if err != nil {
979 return fmt.Errorf("failed to start transaction: %w", err)
980 }
981 defer tx.Rollback()
982
983 _, err = db.AddIssueComment(tx, *comment)
984 if err != nil {
985 return fmt.Errorf("failed to create issue comment: %w", err)
986 }
987
988 return tx.Commit()
989
990 case jmodels.CommitOperationDelete:
991 if err := db.DeleteIssueComments(
992 ddb,
993 orm.FilterEq("did", did),
994 orm.FilterEq("rkey", rkey),
995 ); err != nil {
996 return fmt.Errorf("failed to delete issue comment record: %w", err)
997 }
998
999 return nil
1000 }
1001
1002 return nil
1003}
1004
1005func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
1006 did := e.Did
1007 rkey := e.Commit.RKey
1008
1009 var err error
1010
1011 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1012 l.Info("ingesting record")
1013
1014 ddb, ok := i.Db.Execer.(*db.DB)
1015 if !ok {
1016 return fmt.Errorf("failed to index label definition, invalid db cast")
1017 }
1018
1019 switch e.Commit.Operation {
1020 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1021 raw := json.RawMessage(e.Commit.Record)
1022 record := tangled.LabelDefinition{}
1023 err = json.Unmarshal(raw, &record)
1024 if err != nil {
1025 return fmt.Errorf("invalid record: %w", err)
1026 }
1027
1028 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1029 if err != nil {
1030 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1031 }
1032
1033 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1034 return fmt.Errorf("failed to validate labeldef: %w", err)
1035 }
1036
1037 _, err = db.AddLabelDefinition(ddb, def)
1038 if err != nil {
1039 return fmt.Errorf("failed to create labeldef: %w", err)
1040 }
1041
1042 return nil
1043
1044 case jmodels.CommitOperationDelete:
1045 if err := db.DeleteLabelDefinition(
1046 ddb,
1047 orm.FilterEq("did", did),
1048 orm.FilterEq("rkey", rkey),
1049 ); err != nil {
1050 return fmt.Errorf("failed to delete labeldef record: %w", err)
1051 }
1052
1053 return nil
1054 }
1055
1056 return nil
1057}
1058
1059func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1060 did := e.Did
1061 rkey := e.Commit.RKey
1062
1063 var err error
1064
1065 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1066 l.Info("ingesting record")
1067
1068 ddb, ok := i.Db.Execer.(*db.DB)
1069 if !ok {
1070 return fmt.Errorf("failed to index label op, invalid db cast")
1071 }
1072
1073 switch e.Commit.Operation {
1074 case jmodels.CommitOperationCreate:
1075 raw := json.RawMessage(e.Commit.Record)
1076 record := tangled.LabelOp{}
1077 err = json.Unmarshal(raw, &record)
1078 if err != nil {
1079 return fmt.Errorf("invalid record: %w", err)
1080 }
1081
1082 subject := syntax.ATURI(record.Subject)
1083 collection := subject.Collection()
1084
1085 var repo *models.Repo
1086 switch collection {
1087 case tangled.RepoIssueNSID:
1088 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1089 if err != nil || len(i) != 1 {
1090 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1091 }
1092 repo = i[0].Repo
1093 default:
1094 return fmt.Errorf("unsupport label subject: %s", collection)
1095 }
1096
1097 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1098 if err != nil {
1099 return fmt.Errorf("failed to build label application ctx: %w", err)
1100 }
1101
1102 ops := models.LabelOpsFromRecord(did, rkey, record)
1103
1104 for _, o := range ops {
1105 def, ok := actx.Defs[o.OperandKey]
1106 if !ok {
1107 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1108 }
1109 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1110 return fmt.Errorf("failed to validate labelop: %w", err)
1111 }
1112 }
1113
1114 tx, err := ddb.Begin()
1115 if err != nil {
1116 return err
1117 }
1118 defer tx.Rollback()
1119
1120 for _, o := range ops {
1121 _, err = db.AddLabelOp(tx, &o)
1122 if err != nil {
1123 return fmt.Errorf("failed to add labelop: %w", err)
1124 }
1125 }
1126
1127 if err = tx.Commit(); err != nil {
1128 return err
1129 }
1130 }
1131
1132 return nil
1133}