Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "time"
9
10 "tangled.org/core/api/tangled"
11 "tangled.org/core/eventconsumer"
12 "tangled.org/core/rbac"
13 "tangled.org/core/spindle/db"
14
15 comatproto "github.com/bluesky-social/indigo/api/atproto"
16 "github.com/bluesky-social/indigo/atproto/identity"
17 "github.com/bluesky-social/indigo/atproto/syntax"
18 "github.com/bluesky-social/indigo/xrpc"
19 "github.com/bluesky-social/jetstream/pkg/models"
20 securejoin "github.com/cyphar/filepath-securejoin"
21)
22
23type Ingester func(ctx context.Context, e *models.Event) error
24
25func (s *Spindle) ingest() Ingester {
26 return func(ctx context.Context, e *models.Event) error {
27 if e.Kind != models.EventKindCommit {
28 return nil
29 }
30
31 var err error
32 switch e.Commit.Collection {
33 case tangled.SpindleMemberNSID:
34 err = s.ingestMember(ctx, e)
35 case tangled.RepoNSID:
36 err = s.ingestRepo(ctx, e)
37 case tangled.RepoCollaboratorNSID:
38 err = s.ingestCollaborator(ctx, e)
39 }
40
41 if err != nil {
42 s.l.Warn("failed to process message, skipping", "nsid", e.Commit.Collection, "err", err)
43 }
44
45 lastTimeUs := e.TimeUS + 1
46 if saveErr := s.db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
47 s.l.Error("failed to save cursor", "err", saveErr)
48 }
49
50 return nil
51 }
52}
53
54func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error {
55 var err error
56 did := e.Did
57 rkey := e.Commit.RKey
58
59 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID)
60
61 switch e.Commit.Operation {
62 case models.CommitOperationCreate, models.CommitOperationUpdate:
63 raw := e.Commit.Record
64 record := tangled.SpindleMember{}
65 err = json.Unmarshal(raw, &record)
66 if err != nil {
67 l.Error("invalid record", "error", err)
68 return err
69 }
70
71 domain := s.cfg.Server.Hostname
72 recordInstance := record.Instance
73
74 if recordInstance != domain {
75 l.Error("domain mismatch", "domain", recordInstance, "expected", domain)
76 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain)
77 }
78
79 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain)
80 if err != nil || !ok {
81 l.Error("failed to add member", "did", did, "error", err)
82 return fmt.Errorf("failed to enforce permissions: %w", err)
83 }
84
85 if err := db.AddSpindleMember(s.db, db.SpindleMember{
86 Did: syntax.DID(did),
87 Rkey: rkey,
88 Instance: recordInstance,
89 Subject: syntax.DID(record.Subject),
90 Created: time.Now(),
91 }); err != nil {
92 l.Error("failed to add member", "error", err)
93 return fmt.Errorf("failed to add member: %w", err)
94 }
95
96 if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil {
97 l.Error("failed to add member", "error", err)
98 return fmt.Errorf("failed to add member: %w", err)
99 }
100 l.Info("added member from firehose", "member", record.Subject)
101
102 if err := s.db.AddDid(record.Subject); err != nil {
103 l.Error("failed to add did", "error", err)
104 return fmt.Errorf("failed to add did: %w", err)
105 }
106 s.jc.AddDid(record.Subject)
107
108 return nil
109
110 case models.CommitOperationDelete:
111 record, err := db.GetSpindleMember(s.db, did, rkey)
112 if err != nil {
113 l.Error("failed to find member", "error", err)
114 return fmt.Errorf("failed to find member: %w", err)
115 }
116
117 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil {
118 l.Error("failed to remove member", "error", err)
119 return fmt.Errorf("failed to remove member: %w", err)
120 }
121
122 if err := s.e.RemoveSpindleMember(rbacDomain, record.Subject.String()); err != nil {
123 l.Error("failed to add member", "error", err)
124 return fmt.Errorf("failed to add member: %w", err)
125 }
126 l.Info("added member from firehose", "member", record.Subject)
127
128 if err := s.db.RemoveDid(record.Subject.String()); err != nil {
129 l.Error("failed to add did", "error", err)
130 return fmt.Errorf("failed to add did: %w", err)
131 }
132 s.jc.RemoveDid(record.Subject.String())
133
134 }
135 return nil
136}
137
138func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error {
139 var err error
140 did := e.Did
141
142 l := s.l.With("component", "ingester", "record", tangled.RepoNSID)
143
144 l.Info("ingesting repo record", "did", did)
145
146 switch e.Commit.Operation {
147 case models.CommitOperationCreate, models.CommitOperationUpdate:
148 raw := e.Commit.Record
149 record := tangled.Repo{}
150 err = json.Unmarshal(raw, &record)
151 if err != nil {
152 l.Error("invalid record", "error", err)
153 return err
154 }
155
156 domain := s.cfg.Server.Hostname
157
158 // no spindle configured for this repo
159 if record.Spindle == nil {
160 l.Info("no spindle configured", "name", record.Name)
161 return nil
162 }
163
164 // this repo did not want this spindle
165 if *record.Spindle != domain {
166 l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain)
167 return nil
168 }
169
170 // add this repo to the watch list
171 if err := s.db.AddRepo(record.Knot, did, record.Name); err != nil {
172 l.Error("failed to add repo", "error", err)
173 return fmt.Errorf("failed to add repo: %w", err)
174 }
175
176 didSlashRepo, err := securejoin.SecureJoin(did, record.Name)
177 if err != nil {
178 return err
179 }
180
181 // add repo to rbac
182 if err := s.e.AddRepo(did, rbac.ThisServer, didSlashRepo); err != nil {
183 l.Error("failed to add repo to enforcer", "error", err)
184 return fmt.Errorf("failed to add repo: %w", err)
185 }
186
187 // add collaborators to rbac
188 owner, err := s.res.ResolveIdent(ctx, did)
189 if err != nil || owner.Handle.IsInvalidHandle() {
190 return err
191 }
192 if err := s.fetchAndAddCollaborators(ctx, owner, didSlashRepo); err != nil {
193 return err
194 }
195
196 // add this knot to the event consumer
197 src := eventconsumer.NewKnotSource(record.Knot)
198 s.ks.AddSource(context.Background(), src)
199
200 return nil
201
202 }
203 return nil
204}
205
206func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error {
207 var err error
208
209 l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did)
210
211 l.Info("ingesting collaborator record")
212
213 switch e.Commit.Operation {
214 case models.CommitOperationCreate, models.CommitOperationUpdate:
215 raw := e.Commit.Record
216 record := tangled.RepoCollaborator{}
217 err = json.Unmarshal(raw, &record)
218 if err != nil {
219 l.Error("invalid record", "error", err)
220 return err
221 }
222
223 subjectId, err := s.res.ResolveIdent(ctx, record.Subject)
224 if err != nil || subjectId.Handle.IsInvalidHandle() {
225 return err
226 }
227
228 var rbacResource string
229 var ownerDid string
230 switch {
231 case record.Repo != nil:
232 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
233 if parseErr != nil {
234 l.Info("rejecting record, invalid repoAt", "repoAt", *record.Repo)
235 return nil
236 }
237
238 owner, resolveErr := s.res.ResolveIdent(ctx, repoAt.Authority().String())
239 if resolveErr != nil || owner.Handle.IsInvalidHandle() {
240 return fmt.Errorf("failed to resolve handle: %w", resolveErr)
241 }
242
243 xrpcc := xrpc.Client{
244 Host: owner.PDSEndpoint(),
245 }
246
247 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
248 if getErr != nil {
249 return getErr
250 }
251
252 repo := resp.Value.Val.(*tangled.Repo)
253 rbacResource, _ = securejoin.SecureJoin(owner.DID.String(), repo.Name)
254 ownerDid = owner.DID.String()
255
256 default:
257 l.Info("rejecting collaborator record without repo at-uri (spindle RBAC keyed by owner/name)")
258 return nil
259 }
260
261 if ok, err := s.e.IsCollaboratorInviteAllowed(ownerDid, rbac.ThisServer, rbacResource); !ok || err != nil {
262 return fmt.Errorf("insufficient permissions: %w", err)
263 }
264
265 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, rbacResource); err != nil {
266 l.Error("failed to add collaborator to enforcer", "error", err)
267 return fmt.Errorf("failed to add collaborator: %w", err)
268 }
269
270 return nil
271 }
272 return nil
273}
274
275func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error {
276 l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators")
277
278 l.Info("fetching and adding existing collaborators")
279
280 xrpcc := xrpc.Client{
281 Host: owner.PDSEndpoint(),
282 }
283
284 resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false)
285 if err != nil {
286 return err
287 }
288
289 var errs error
290 for _, r := range resp.Records {
291 if r == nil {
292 continue
293 }
294 record := r.Value.Val.(*tangled.RepoCollaborator)
295
296 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil {
297 l.Error("failed to add repo to enforcer", "error", err)
298 errors.Join(errs, fmt.Errorf("failed to add repo: %w", err))
299 }
300 }
301
302 return errs
303}