Monorepo for Tangled tangled.org
761
fork

Configure Feed

Select the types of activity you want to include in your feed.

appview/ingester: ingest pulls from firehose

Signed-off-by: oppiliappan <me@oppi.li>

authored by

oppiliappan and committed by tangled.org d525c478 704e5cfe

+149
+148
appview/ingester.go
··· 6 6 "encoding/json" 7 7 "errors" 8 8 "fmt" 9 + "io" 9 10 "log/slog" 10 11 "maps" 12 + "net/http" 13 + "net/url" 11 14 "slices" 15 + "sync" 12 16 13 17 "time" 14 18 ··· 17 21 jmodels "github.com/bluesky-social/jetstream/pkg/models" 18 22 "github.com/go-git/go-git/v5/plumbing" 19 23 "github.com/ipfs/go-cid" 24 + "golang.org/x/sync/errgroup" 20 25 "tangled.org/core/api/tangled" 21 26 "tangled.org/core/appview/config" 22 27 "tangled.org/core/appview/db" ··· 75 80 err = i.ingestString(e) 76 81 case tangled.RepoIssueNSID: 77 82 err = i.ingestIssue(ctx, e) 83 + case tangled.RepoPullNSID: 84 + err = i.ingestPull(ctx, e) 78 85 case tangled.RepoIssueCommentNSID: 79 86 err = i.ingestIssueComment(e) 80 87 case tangled.LabelDefinitionNSID: ··· 939 946 ); err != nil { 940 947 l.Error("failed to delete", "err", err) 941 948 return fmt.Errorf("failed to delete issue record: %w", err) 949 + } 950 + if err := tx.Commit(); err != nil { 951 + l.Error("failed to commit txn", "err", err) 952 + return err 953 + } 954 + 955 + return nil 956 + } 957 + 958 + return nil 959 + } 960 + 961 + func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error { 962 + did := e.Did 963 + rkey := e.Commit.RKey 964 + 965 + var err error 966 + 967 + l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 968 + l.Info("ingesting record") 969 + 970 + ddb, ok := i.Db.Execer.(*db.DB) 971 + if !ok { 972 + return fmt.Errorf("failed to index pull record, invalid db cast") 973 + } 974 + 975 + switch e.Commit.Operation { 976 + case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 977 + raw := json.RawMessage(e.Commit.Record) 978 + record := tangled.RepoPull{} 979 + err = json.Unmarshal(raw, &record) 980 + if err != nil { 981 + l.Error("invalid record", "err", err) 982 + return err 983 + } 984 + 985 + ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 986 + if err != nil { 987 + l.Error("failed to resolve did") 988 + return err 989 + } 990 + 991 + // go through and fetch all blobs in parallel 992 + readers := make([]*io.ReadCloser, len(record.Rounds)) 993 + var mu sync.Mutex 994 + 995 + g, gctx := errgroup.WithContext(ctx) 996 + 997 + for idx, b := range record.Rounds { 998 + g.Go(func() error { 999 + // for some reason, a blob is empty 1000 + if b.PatchBlob == nil { 1001 + return fmt.Errorf("missing patchBlob in round %d", idx) 1002 + } 1003 + 1004 + ownerPds := ownerId.PDSEndpoint() 1005 + url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1006 + q := url.Query() 1007 + q.Set("cid", b.PatchBlob.Ref.String()) 1008 + q.Set("did", did) 1009 + url.RawQuery = q.Encode() 1010 + 1011 + req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1012 + if err != nil { 1013 + l.Error("failed to create request") 1014 + return err 1015 + } 1016 + req.Header.Set("Content-Type", "application/json") 1017 + 1018 + resp, err := http.DefaultClient.Do(req) 1019 + if err != nil { 1020 + l.Error("failed to make request") 1021 + return err 1022 + } 1023 + 1024 + mu.Lock() 1025 + readers[idx] = &resp.Body 1026 + mu.Unlock() 1027 + 1028 + return nil 1029 + }) 1030 + } 1031 + 1032 + if err := g.Wait(); err != nil { 1033 + for _, r := range readers { 1034 + if r != nil && *r != nil { 1035 + (*r).Close() 1036 + } 1037 + } 1038 + return err 1039 + } 1040 + 1041 + defer func() { 1042 + for _, r := range readers { 1043 + if r != nil && *r != nil { 1044 + (*r).Close() 1045 + } 1046 + } 1047 + }() 1048 + 1049 + pull := models.PullFromRecord(did, rkey, record, readers) 1050 + if err := i.Validator.ValidatePull(&pull); err != nil { 1051 + return fmt.Errorf("failed to validate pull: %w", err) 1052 + } 1053 + 1054 + tx, err := ddb.BeginTx(ctx, nil) 1055 + if err != nil { 1056 + l.Error("failed to begin transaction", "err", err) 1057 + return err 1058 + } 1059 + defer tx.Rollback() 1060 + 1061 + err = db.PutPull(tx, &pull) 1062 + if err != nil { 1063 + l.Error("failed to create pull", "err", err) 1064 + return err 1065 + } 1066 + 1067 + err = tx.Commit() 1068 + if err != nil { 1069 + l.Error("failed to commit txn", "err", err) 1070 + return err 1071 + } 1072 + 1073 + return nil 1074 + 1075 + case jmodels.CommitOperationDelete: 1076 + tx, err := ddb.BeginTx(ctx, nil) 1077 + if err != nil { 1078 + l.Error("failed to begin transaction", "err", err) 1079 + return err 1080 + } 1081 + defer tx.Rollback() 1082 + 1083 + if err := db.AbandonPulls( 1084 + tx, 1085 + orm.FilterEq("owner_did", did), 1086 + orm.FilterEq("rkey", rkey), 1087 + ); err != nil { 1088 + l.Error("failed to abandon", "err", err) 1089 + return fmt.Errorf("failed to abandon pull record: %w", err) 942 1090 } 943 1091 if err := tx.Commit(); err != nil { 944 1092 l.Error("failed to commit txn", "err", err)
+1
appview/state/state.go
··· 125 125 tangled.SpindleNSID, 126 126 tangled.KnotNSID, 127 127 tangled.StringNSID, 128 + tangled.RepoPullNSID, 128 129 tangled.RepoIssueNSID, 129 130 tangled.RepoIssueCommentNSID, 130 131 tangled.LabelDefinitionNSID,