forked from
tangled.org/core
Monorepo for Tangled
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "strings"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/xrpc"
16 jmodels "github.com/bluesky-social/jetstream/pkg/models"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/knotserver/db"
20 "tangled.org/core/knotserver/git"
21 "tangled.org/core/log"
22 "tangled.org/core/rbac"
23 "tangled.org/core/workflow"
24)
25
26func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error {
27 l := log.FromContext(ctx)
28 raw := json.RawMessage(event.Commit.Record)
29 did := event.Did
30
31 var record tangled.PublicKey
32 if err := json.Unmarshal(raw, &record); err != nil {
33 return fmt.Errorf("failed to unmarshal record: %w", err)
34 }
35
36 pk := db.PublicKey{
37 Did: did,
38 PublicKey: record,
39 }
40 if err := h.db.AddPublicKey(pk); err != nil {
41 l.Error("failed to add public key", "error", err)
42 return fmt.Errorf("failed to add public key: %w", err)
43 }
44 l.Info("added public key from firehose", "did", did)
45 return nil
46}
47
48func (h *Knot) processKnotMember(ctx context.Context, event *jmodels.Event) error {
49 l := log.FromContext(ctx)
50 raw := json.RawMessage(event.Commit.Record)
51 did := event.Did
52
53 var record tangled.KnotMember
54 if err := json.Unmarshal(raw, &record); err != nil {
55 return fmt.Errorf("failed to unmarshal record: %w", err)
56 }
57
58 if record.Domain != h.c.Server.Hostname {
59 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
60 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
61 }
62
63 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
64 if err != nil || !ok {
65 l.Error("failed to add member", "did", did)
66 return fmt.Errorf("failed to enforce permissions: %w", err)
67 }
68
69 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
70 l.Error("failed to add member", "error", err)
71 return fmt.Errorf("failed to add member: %w", err)
72 }
73 l.Info("added member from firehose", "member", record.Subject)
74
75 if err := h.db.AddDid(record.Subject); err != nil {
76 l.Error("failed to add did", "error", err)
77 return fmt.Errorf("failed to add did: %w", err)
78 }
79 h.jc.AddDid(record.Subject)
80
81 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil {
82 return fmt.Errorf("failed to fetch and add keys: %w", err)
83 }
84
85 return nil
86}
87
88// returns a repo path on disk if present, and error if not
89type targetRepo struct {
90 RepoPath string
91 OwnerDid string
92 RepoName string
93 RepoDid string
94}
95
96func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) {
97 if record.Target == nil {
98 return nil, fmt.Errorf("ignoring pull record: target repo is nil")
99 }
100
101 if record.Source == nil {
102 return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request")
103 }
104
105 if record.Source.Repo != nil || record.Source.RepoDid != nil {
106 return nil, fmt.Errorf("ignoring pull record: fork based pull")
107 }
108
109 var repoPath, ownerDid, repoName, repoDid string
110 switch {
111 case record.Target.RepoDid != nil && *record.Target.RepoDid != "":
112 repoDid = *record.Target.RepoDid
113 var lookupErr error
114 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
115 if lookupErr != nil {
116 return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr)
117 }
118
119 case record.Target.Repo != nil:
120 // TODO: get rid of this PDS fetch once all repos have DIDs
121 repoAt, parseErr := syntax.ParseATURI(*record.Target.Repo)
122 if parseErr != nil {
123 return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr)
124 }
125
126 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
127 if resolveErr != nil || ident.Handle.IsInvalidHandle() {
128 return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr)
129 }
130
131 xrpcc := xrpc.Client{
132 Host: ident.PDSEndpoint(),
133 }
134
135 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
136 if getErr != nil {
137 return nil, fmt.Errorf("failed to resolve repo: %w", getErr)
138 }
139
140 repo := resp.Value.Val.(*tangled.Repo)
141
142 if repo.Knot != h.c.Server.Hostname {
143 return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname)
144 }
145
146 ownerDid = ident.DID.String()
147 repoName = repo.Name
148
149 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName)
150 if didErr != nil {
151 return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr)
152 }
153
154 var lookupErr error
155 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
156 if lookupErr != nil {
157 return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr)
158 }
159
160 default:
161 return nil, fmt.Errorf("ignoring pull record: target has neither repo nor repoDid")
162 }
163
164 _, err := git.Open(repoPath, record.Source.Branch)
165 if err != nil {
166 return nil, fmt.Errorf("failed to open git repository: %w", err)
167 }
168
169 return &targetRepo{
170 RepoPath: repoPath,
171 OwnerDid: ownerDid,
172 RepoName: repoName,
173 RepoDid: repoDid,
174 }, nil
175}
176
177func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) {
178 // resolve the PR owner's identity to fetch the blob from their PDS
179 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did)
180 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() {
181 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err)
182 }
183
184 roundNumber := len(record.Rounds) - 1
185 round := record.Rounds[roundNumber]
186
187 // fetch the blob from the PR owner's PDS
188 prOwnerPds := prOwnerIdent.PDSEndpoint()
189 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds))
190 if err != nil {
191 return nil, fmt.Errorf("failed to construct blob URL: %w", err)
192 }
193 q := blobUrl.Query()
194 q.Set("cid", round.PatchBlob.Ref.String())
195 q.Set("did", did)
196 blobUrl.RawQuery = q.Encode()
197
198 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil)
199 if err != nil {
200 return nil, fmt.Errorf("failed to create blob request: %w", err)
201 }
202 req.Header.Set("Content-Type", "application/json")
203
204 blobResp, err := http.DefaultClient.Do(req)
205 if err != nil {
206 return nil, fmt.Errorf("failed to fetch blob: %w", err)
207 }
208 defer blobResp.Body.Close()
209
210 blob := io.ReadCloser(blobResp.Body)
211 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob)
212 if err != nil {
213 return nil, fmt.Errorf("failed to parse submission: %w", err)
214 }
215
216 return latestSubmission, nil
217}
218
219func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) {
220 gr, err := git.Open(repoPath, sha)
221 if err != nil {
222 return nil, fmt.Errorf("failed to open git repository: %w", err)
223 }
224
225 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
226 if err != nil {
227 return nil, fmt.Errorf("failed to open workflow directory: %w", err)
228 }
229
230 var pipeline workflow.RawPipeline
231 for _, e := range workflowDir {
232 if !e.IsFile() {
233 continue
234 }
235
236 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
237 contents, err := gr.RawContent(fpath)
238 if err != nil {
239 continue
240 }
241
242 pipeline = append(pipeline, workflow.RawWorkflow{
243 Name: e.Name,
244 Contents: contents,
245 })
246 }
247
248 return pipeline, nil
249}
250
251func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline {
252 l := log.FromContext(ctx)
253
254 trigger := tangled.Pipeline_PullRequestTriggerData{
255 Action: "create",
256 SourceBranch: sourceBranch,
257 SourceSha: sourceSha,
258 TargetBranch: targetBranch,
259 }
260
261 compiler := workflow.Compiler{
262 Trigger: tangled.Pipeline_TriggerMetadata{
263 Kind: string(workflow.TriggerKindPullRequest),
264 PullRequest: &trigger,
265 Repo: &tangled.Pipeline_TriggerRepo{
266 Knot: h.c.Server.Hostname,
267 RepoDid: &targetRepo.RepoDid,
268 Did: targetRepo.OwnerDid,
269 Repo: &targetRepo.RepoName,
270 },
271 },
272 }
273
274 l.Info("raw", "raw", rawPipeline)
275 parsed := compiler.Parse(rawPipeline)
276 l.Info("parsed", "parsed", parsed)
277 compiled := compiler.Compile(parsed)
278
279 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics)
280
281 return compiled
282}
283
284func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error {
285 raw := json.RawMessage(event.Commit.Record)
286 rkey := event.Commit.RKey
287 did := event.Did
288
289 var record tangled.RepoPull
290 if err := json.Unmarshal(raw, &record); err != nil {
291 return fmt.Errorf("failed to unmarshal record: %w", err)
292 }
293
294 l := log.FromContext(ctx)
295 l = l.With("handler", "processPull")
296 l = l.With("did", did)
297
298 l.Info("validating pull record")
299 targetRepo, err := h.validatePullRecord(ctx, &record)
300 if err != nil {
301 l.Warn("pull record did not validate, skipping...")
302 return err
303 }
304
305 l = l.With("target_repo", record.Target.Repo)
306 l = l.With("target_branch", record.Target.Branch)
307
308 l.Info("fetching latest submission")
309 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record)
310 if err != nil {
311 return err
312 }
313
314 sha := latestSubmission.SourceRev
315 if sha == "" {
316 return fmt.Errorf("failed to extract source SHA from pull submission")
317 }
318 l = l.With("sha", sha)
319
320 l.Info("discovering workflows", "repo_path", targetRepo.RepoPath)
321 pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha)
322 if err != nil {
323 return err
324 }
325
326 l.Info("compiling pipeline", "workflow_count", len(pipeline))
327 cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline)
328
329 // do not run empty pipelines
330 if cp.Workflows == nil {
331 l.Info("skipping empty pipeline")
332 return nil
333 }
334
335 l.Info("marshaling pipeline event")
336 eventJson, err := json.Marshal(cp)
337 if err != nil {
338 return fmt.Errorf("failed to marshal pipeline event: %w", err)
339 }
340
341 ev := db.Event{
342 Rkey: TID(),
343 Nsid: tangled.PipelineNSID,
344 EventJson: string(eventJson),
345 }
346
347 l.Info("inserting pipeline event")
348 return h.db.InsertEvent(ev, h.n)
349}
350
351// duplicated from add collaborator
352func (h *Knot) processCollaborator(ctx context.Context, event *jmodels.Event) error {
353 raw := json.RawMessage(event.Commit.Record)
354 did := event.Did
355
356 var record tangled.RepoCollaborator
357 if err := json.Unmarshal(raw, &record); err != nil {
358 return fmt.Errorf("failed to unmarshal record: %w", err)
359 }
360
361 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject)
362 if err != nil || subjectId.Handle.IsInvalidHandle() {
363 return err
364 }
365
366 var rbacResource string
367 switch {
368 case record.RepoDid != nil && *record.RepoDid != "":
369 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(*record.RepoDid)
370 if lookupErr != nil {
371 return fmt.Errorf("unknown repo DID %s: %w", *record.RepoDid, lookupErr)
372 }
373 if ownerDid != did {
374 return fmt.Errorf("collaborator record author %s does not own repo %s", did, *record.RepoDid)
375 }
376 rbacResource = *record.RepoDid
377
378 case record.Repo != nil:
379 // TODO: get rid of this PDS fetch once all repos have DIDs
380 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
381 if parseErr != nil {
382 return parseErr
383 }
384
385 owner, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
386 if resolveErr != nil || owner.Handle.IsInvalidHandle() {
387 return fmt.Errorf("failed to resolve handle: %w", resolveErr)
388 }
389
390 xrpcc := xrpc.Client{
391 Host: owner.PDSEndpoint(),
392 }
393
394 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
395 if getErr != nil {
396 return getErr
397 }
398
399 repo := resp.Value.Val.(*tangled.Repo)
400 repoDid, didErr := h.db.GetRepoDid(owner.DID.String(), repo.Name)
401 if didErr != nil {
402 return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", owner.DID.String(), repo.Name, didErr)
403 }
404 rbacResource = repoDid
405
406 default:
407 return fmt.Errorf("collaborator record has neither repo nor repoDid")
408 }
409
410 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, rbacResource)
411 if err != nil {
412 return fmt.Errorf("failed to check permissions: %w", err)
413 }
414 if !ok {
415 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", rbacResource)
416 }
417
418 if err := h.db.AddDid(subjectId.DID.String()); err != nil {
419 return err
420 }
421 h.jc.AddDid(subjectId.DID.String())
422
423 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, rbacResource); err != nil {
424 return err
425 }
426
427 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
428}
429
430func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error {
431 l := log.FromContext(ctx)
432
433 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
434 if err != nil {
435 l.Error("error building endpoint url", "did", did, "error", err.Error())
436 return fmt.Errorf("error building endpoint url: %w", err)
437 }
438
439 resp, err := http.Get(keysEndpoint)
440 if err != nil {
441 l.Error("error getting keys", "did", did, "error", err)
442 return fmt.Errorf("error getting keys: %w", err)
443 }
444 defer resp.Body.Close()
445
446 if resp.StatusCode == http.StatusNotFound {
447 l.Info("no keys found for did", "did", did)
448 return nil
449 }
450
451 plaintext, err := io.ReadAll(resp.Body)
452 if err != nil {
453 l.Error("error reading response body", "error", err)
454 return fmt.Errorf("error reading response body: %w", err)
455 }
456
457 for key := range strings.SplitSeq(string(plaintext), "\n") {
458 if key == "" {
459 continue
460 }
461 pk := db.PublicKey{
462 Did: did,
463 }
464 pk.Key = key
465 if err := h.db.AddPublicKey(pk); err != nil {
466 l.Error("failed to add public key", "error", err)
467 return fmt.Errorf("failed to add public key: %w", err)
468 }
469 }
470 return nil
471}
472
473func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error {
474 if event.Kind != jmodels.EventKindCommit {
475 return nil
476 }
477
478 var err error
479 switch event.Commit.Collection {
480 case tangled.PublicKeyNSID:
481 err = h.processPublicKey(ctx, event)
482 case tangled.KnotMemberNSID:
483 err = h.processKnotMember(ctx, event)
484 case tangled.RepoPullNSID:
485 err = h.processPull(ctx, event)
486 case tangled.RepoCollaboratorNSID:
487 err = h.processCollaborator(ctx, event)
488 }
489
490 if err != nil {
491 h.l.Warn("failed to process event, skipping", "nsid", event.Commit.Collection, "err", err)
492 }
493
494 lastTimeUs := event.TimeUS + 1
495 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
496 h.l.Error("failed to save cursor", "err", saveErr)
497 }
498
499 return nil
500}