Monorepo for Tangled
tangled.org
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "strings"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/xrpc"
16 jmodels "github.com/bluesky-social/jetstream/pkg/models"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/knotserver/db"
20 "tangled.org/core/knotserver/git"
21 "tangled.org/core/log"
22 "tangled.org/core/rbac"
23 "tangled.org/core/workflow"
24)
25
26func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error {
27 l := log.FromContext(ctx)
28 raw := json.RawMessage(event.Commit.Record)
29 did := event.Did
30
31 var record tangled.PublicKey
32 if err := json.Unmarshal(raw, &record); err != nil {
33 return fmt.Errorf("failed to unmarshal record: %w", err)
34 }
35
36 pk := db.PublicKey{
37 Did: did,
38 PublicKey: record,
39 }
40 if err := h.db.AddPublicKey(pk); err != nil {
41 l.Error("failed to add public key", "error", err)
42 return fmt.Errorf("failed to add public key: %w", err)
43 }
44 l.Info("added public key from firehose", "did", did)
45 return nil
46}
47
48func (h *Knot) processKnotMember(ctx context.Context, event *jmodels.Event) error {
49 l := log.FromContext(ctx)
50 raw := json.RawMessage(event.Commit.Record)
51 did := event.Did
52
53 var record tangled.KnotMember
54 if err := json.Unmarshal(raw, &record); err != nil {
55 return fmt.Errorf("failed to unmarshal record: %w", err)
56 }
57
58 if record.Domain != h.c.Server.Hostname {
59 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
60 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
61 }
62
63 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
64 if err != nil || !ok {
65 l.Error("failed to add member", "did", did)
66 return fmt.Errorf("failed to enforce permissions: %w", err)
67 }
68
69 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
70 l.Error("failed to add member", "error", err)
71 return fmt.Errorf("failed to add member: %w", err)
72 }
73 l.Info("added member from firehose", "member", record.Subject)
74
75 if err := h.db.AddDid(record.Subject); err != nil {
76 l.Error("failed to add did", "error", err)
77 return fmt.Errorf("failed to add did: %w", err)
78 }
79 h.jc.AddDid(record.Subject)
80
81 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil {
82 return fmt.Errorf("failed to fetch and add keys: %w", err)
83 }
84
85 return nil
86}
87
88// returns a repo path on disk if present, and error if not
89type targetRepo struct {
90 RepoPath string
91 OwnerDid string
92 RepoName string
93 RepoDid string
94}
95
96func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) {
97 if record.Target == nil {
98 return nil, fmt.Errorf("ignoring pull record: target repo is nil")
99 }
100
101 if record.Source == nil {
102 return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request")
103 }
104
105 if record.Source.Repo != nil || record.Source.RepoDid != nil {
106 return nil, fmt.Errorf("ignoring pull record: fork based pull")
107 }
108
109 var repoPath, ownerDid, repoName, repoDid string
110 switch {
111 case record.Target.RepoDid != nil && *record.Target.RepoDid != "":
112 repoDid = *record.Target.RepoDid
113 var lookupErr error
114 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
115 if lookupErr != nil {
116 return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr)
117 }
118
119 case record.Target.Repo != nil:
120 // TODO: get rid of this PDS fetch once all repos have DIDs
121 repoAt, parseErr := syntax.ParseATURI(*record.Target.Repo)
122 if parseErr != nil {
123 return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr)
124 }
125
126 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
127 if resolveErr != nil || ident.Handle.IsInvalidHandle() {
128 return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr)
129 }
130
131 xrpcc := xrpc.Client{
132 Host: ident.PDSEndpoint(),
133 }
134
135 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
136 if getErr != nil {
137 return nil, fmt.Errorf("failed to resolve repo: %w", getErr)
138 }
139
140 repo := resp.Value.Val.(*tangled.Repo)
141
142 if repo.Knot != h.c.Server.Hostname {
143 return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname)
144 }
145
146 ownerDid = ident.DID.String()
147 repoName = repo.Name
148
149 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName)
150 if didErr != nil {
151 return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr)
152 }
153
154 var lookupErr error
155 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
156 if lookupErr != nil {
157 return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr)
158 }
159
160 default:
161 return nil, fmt.Errorf("ignoring pull record: target has neither repo nor repoDid")
162 }
163
164 _, err := git.Open(repoPath, record.Source.Branch)
165 if err != nil {
166 return nil, fmt.Errorf("failed to open git repository: %w", err)
167 }
168
169 return &targetRepo{
170 RepoPath: repoPath,
171 OwnerDid: ownerDid,
172 RepoName: repoName,
173 RepoDid: repoDid,
174 }, nil
175}
176
177func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) {
178 // resolve the PR owner's identity to fetch the blob from their PDS
179 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did)
180 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() {
181 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err)
182 }
183
184 if len(record.Rounds) == 0 {
185 return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record")
186 }
187
188 roundNumber := len(record.Rounds) - 1
189 round := record.Rounds[roundNumber]
190
191 // fetch the blob from the PR owner's PDS
192 prOwnerPds := prOwnerIdent.PDSEndpoint()
193 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds))
194 if err != nil {
195 return nil, fmt.Errorf("failed to construct blob URL: %w", err)
196 }
197 q := blobUrl.Query()
198 q.Set("cid", round.PatchBlob.Ref.String())
199 q.Set("did", did)
200 blobUrl.RawQuery = q.Encode()
201
202 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil)
203 if err != nil {
204 return nil, fmt.Errorf("failed to create blob request: %w", err)
205 }
206 req.Header.Set("Content-Type", "application/json")
207
208 blobResp, err := http.DefaultClient.Do(req)
209 if err != nil {
210 return nil, fmt.Errorf("failed to fetch blob: %w", err)
211 }
212 defer blobResp.Body.Close()
213
214 blob := io.ReadCloser(blobResp.Body)
215 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob)
216 if err != nil {
217 return nil, fmt.Errorf("failed to parse submission: %w", err)
218 }
219
220 return latestSubmission, nil
221}
222
223func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) {
224 gr, err := git.Open(repoPath, sha)
225 if err != nil {
226 return nil, fmt.Errorf("failed to open git repository: %w", err)
227 }
228
229 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
230 if err != nil {
231 return nil, fmt.Errorf("failed to open workflow directory: %w", err)
232 }
233
234 var pipeline workflow.RawPipeline
235 for _, e := range workflowDir {
236 if !e.IsFile() {
237 continue
238 }
239
240 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
241 contents, err := gr.RawContent(fpath)
242 if err != nil {
243 continue
244 }
245
246 pipeline = append(pipeline, workflow.RawWorkflow{
247 Name: e.Name,
248 Contents: contents,
249 })
250 }
251
252 return pipeline, nil
253}
254
255func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline {
256 l := log.FromContext(ctx)
257
258 trigger := tangled.Pipeline_PullRequestTriggerData{
259 Action: "create",
260 SourceBranch: sourceBranch,
261 SourceSha: sourceSha,
262 TargetBranch: targetBranch,
263 }
264
265 compiler := workflow.Compiler{
266 Trigger: tangled.Pipeline_TriggerMetadata{
267 Kind: string(workflow.TriggerKindPullRequest),
268 PullRequest: &trigger,
269 Repo: &tangled.Pipeline_TriggerRepo{
270 Knot: h.c.Server.Hostname,
271 RepoDid: &targetRepo.RepoDid,
272 Did: targetRepo.OwnerDid,
273 Repo: &targetRepo.RepoName,
274 },
275 },
276 }
277
278 l.Info("raw", "raw", rawPipeline)
279 parsed := compiler.Parse(rawPipeline)
280 l.Info("parsed", "parsed", parsed)
281 compiled := compiler.Compile(parsed)
282
283 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics)
284
285 return compiled
286}
287
288func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error {
289 raw := json.RawMessage(event.Commit.Record)
290 rkey := event.Commit.RKey
291 did := event.Did
292
293 var record tangled.RepoPull
294 if err := json.Unmarshal(raw, &record); err != nil {
295 return fmt.Errorf("failed to unmarshal record: %w", err)
296 }
297
298 l := log.FromContext(ctx)
299 l = l.With("handler", "processPull")
300 l = l.With("did", did)
301
302 l.Info("validating pull record")
303 targetRepo, err := h.validatePullRecord(ctx, &record)
304 if err != nil {
305 l.Warn("pull record did not validate, skipping...")
306 return err
307 }
308
309 l = l.With("target_repo", record.Target.Repo)
310 l = l.With("target_branch", record.Target.Branch)
311
312 l.Info("fetching latest submission")
313 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record)
314 if err != nil {
315 return err
316 }
317
318 sha := latestSubmission.SourceRev
319 if sha == "" {
320 return fmt.Errorf("failed to extract source SHA from pull submission")
321 }
322 l = l.With("sha", sha)
323
324 l.Info("discovering workflows", "repo_path", targetRepo.RepoPath)
325 pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha)
326 if err != nil {
327 return err
328 }
329
330 l.Info("compiling pipeline", "workflow_count", len(pipeline))
331 cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline)
332
333 // do not run empty pipelines
334 if cp.Workflows == nil {
335 l.Info("skipping empty pipeline")
336 return nil
337 }
338
339 l.Info("marshaling pipeline event")
340 eventJson, err := json.Marshal(cp)
341 if err != nil {
342 return fmt.Errorf("failed to marshal pipeline event: %w", err)
343 }
344
345 ev := db.Event{
346 Rkey: TID(),
347 Nsid: tangled.PipelineNSID,
348 EventJson: string(eventJson),
349 }
350
351 l.Info("inserting pipeline event")
352 return h.db.InsertEvent(ev, h.n)
353}
354
355// duplicated from add collaborator
356func (h *Knot) processCollaborator(ctx context.Context, event *jmodels.Event) error {
357 raw := json.RawMessage(event.Commit.Record)
358 did := event.Did
359
360 var record tangled.RepoCollaborator
361 if err := json.Unmarshal(raw, &record); err != nil {
362 return fmt.Errorf("failed to unmarshal record: %w", err)
363 }
364
365 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject)
366 if err != nil || subjectId.Handle.IsInvalidHandle() {
367 return err
368 }
369
370 var rbacResource string
371 switch {
372 case record.RepoDid != nil && *record.RepoDid != "":
373 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(*record.RepoDid)
374 if lookupErr != nil {
375 return fmt.Errorf("unknown repo DID %s: %w", *record.RepoDid, lookupErr)
376 }
377 if ownerDid != did {
378 return fmt.Errorf("collaborator record author %s does not own repo %s", did, *record.RepoDid)
379 }
380 rbacResource = *record.RepoDid
381
382 case record.Repo != nil:
383 // TODO: get rid of this PDS fetch once all repos have DIDs
384 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
385 if parseErr != nil {
386 return parseErr
387 }
388
389 owner, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
390 if resolveErr != nil || owner.Handle.IsInvalidHandle() {
391 return fmt.Errorf("failed to resolve handle: %w", resolveErr)
392 }
393
394 xrpcc := xrpc.Client{
395 Host: owner.PDSEndpoint(),
396 }
397
398 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
399 if getErr != nil {
400 return getErr
401 }
402
403 repo := resp.Value.Val.(*tangled.Repo)
404 repoDid, didErr := h.db.GetRepoDid(owner.DID.String(), repo.Name)
405 if didErr != nil {
406 return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", owner.DID.String(), repo.Name, didErr)
407 }
408 rbacResource = repoDid
409
410 default:
411 return fmt.Errorf("collaborator record has neither repo nor repoDid")
412 }
413
414 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, rbacResource)
415 if err != nil {
416 return fmt.Errorf("failed to check permissions: %w", err)
417 }
418 if !ok {
419 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", rbacResource)
420 }
421
422 if err := h.db.AddDid(subjectId.DID.String()); err != nil {
423 return err
424 }
425 h.jc.AddDid(subjectId.DID.String())
426
427 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, rbacResource); err != nil {
428 return err
429 }
430
431 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
432}
433
434func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error {
435 l := log.FromContext(ctx)
436
437 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
438 if err != nil {
439 l.Error("error building endpoint url", "did", did, "error", err.Error())
440 return fmt.Errorf("error building endpoint url: %w", err)
441 }
442
443 resp, err := http.Get(keysEndpoint)
444 if err != nil {
445 l.Error("error getting keys", "did", did, "error", err)
446 return fmt.Errorf("error getting keys: %w", err)
447 }
448 defer resp.Body.Close()
449
450 if resp.StatusCode == http.StatusNotFound {
451 l.Info("no keys found for did", "did", did)
452 return nil
453 }
454
455 plaintext, err := io.ReadAll(resp.Body)
456 if err != nil {
457 l.Error("error reading response body", "error", err)
458 return fmt.Errorf("error reading response body: %w", err)
459 }
460
461 for key := range strings.SplitSeq(string(plaintext), "\n") {
462 if key == "" {
463 continue
464 }
465 pk := db.PublicKey{
466 Did: did,
467 }
468 pk.Key = key
469 if err := h.db.AddPublicKey(pk); err != nil {
470 l.Error("failed to add public key", "error", err)
471 return fmt.Errorf("failed to add public key: %w", err)
472 }
473 }
474 return nil
475}
476
477func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error {
478 var err error
479 switch event.Kind {
480 case jmodels.EventKindIdentity:
481 err = h.resolver.InvalidateIdent(ctx, event.Did)
482 case jmodels.EventKindCommit:
483 switch event.Commit.Collection {
484 case tangled.PublicKeyNSID:
485 err = h.processPublicKey(ctx, event)
486 case tangled.KnotMemberNSID:
487 err = h.processKnotMember(ctx, event)
488 case tangled.RepoPullNSID:
489 err = h.processPull(ctx, event)
490 case tangled.RepoCollaboratorNSID:
491 err = h.processCollaborator(ctx, event)
492 }
493 default:
494 return nil
495 }
496
497 if err != nil {
498 args := []any{"kind", event.Kind, "err", err}
499 if event.Kind == jmodels.EventKindCommit {
500 args = append(args, "nsid", event.Commit.Collection)
501 }
502 h.l.Warn("failed to process event, skipping", args...)
503 }
504
505 lastTimeUs := event.TimeUS + 1
506 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
507 h.l.Error("failed to save cursor", "err", saveErr)
508 }
509
510 return nil
511}