Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at sl/comment 500 lines 14 kB view raw
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "path/filepath" 11 "strings" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/indigo/xrpc" 16 jmodels "github.com/bluesky-social/jetstream/pkg/models" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/knotserver/db" 20 "tangled.org/core/knotserver/git" 21 "tangled.org/core/log" 22 "tangled.org/core/rbac" 23 "tangled.org/core/workflow" 24) 25 26func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error { 27 l := log.FromContext(ctx) 28 raw := json.RawMessage(event.Commit.Record) 29 did := event.Did 30 31 var record tangled.PublicKey 32 if err := json.Unmarshal(raw, &record); err != nil { 33 return fmt.Errorf("failed to unmarshal record: %w", err) 34 } 35 36 pk := db.PublicKey{ 37 Did: did, 38 PublicKey: record, 39 } 40 if err := h.db.AddPublicKey(pk); err != nil { 41 l.Error("failed to add public key", "error", err) 42 return fmt.Errorf("failed to add public key: %w", err) 43 } 44 l.Info("added public key from firehose", "did", did) 45 return nil 46} 47 48func (h *Knot) processKnotMember(ctx context.Context, event *jmodels.Event) error { 49 l := log.FromContext(ctx) 50 raw := json.RawMessage(event.Commit.Record) 51 did := event.Did 52 53 var record tangled.KnotMember 54 if err := json.Unmarshal(raw, &record); err != nil { 55 return fmt.Errorf("failed to unmarshal record: %w", err) 56 } 57 58 if record.Domain != h.c.Server.Hostname { 59 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 60 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 61 } 62 63 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 64 if err != nil || !ok { 65 l.Error("failed to add member", "did", did) 66 return fmt.Errorf("failed to enforce permissions: %w", err) 67 } 68 69 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 70 l.Error("failed to add member", "error", err) 71 return fmt.Errorf("failed to add member: %w", err) 72 } 73 l.Info("added member from firehose", "member", record.Subject) 74 75 if err := h.db.AddDid(record.Subject); err != nil { 76 l.Error("failed to add did", "error", err) 77 return fmt.Errorf("failed to add did: %w", err) 78 } 79 h.jc.AddDid(record.Subject) 80 81 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil { 82 return fmt.Errorf("failed to fetch and add keys: %w", err) 83 } 84 85 return nil 86} 87 88// returns a repo path on disk if present, and error if not 89type targetRepo struct { 90 RepoPath string 91 OwnerDid string 92 RepoName string 93 RepoDid string 94} 95 96func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) { 97 if record.Target == nil { 98 return nil, fmt.Errorf("ignoring pull record: target repo is nil") 99 } 100 101 if record.Source == nil { 102 return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request") 103 } 104 105 if record.Source.Repo != nil || record.Source.RepoDid != nil { 106 return nil, fmt.Errorf("ignoring pull record: fork based pull") 107 } 108 109 var repoPath, ownerDid, repoName, repoDid string 110 switch { 111 case record.Target.RepoDid != nil && *record.Target.RepoDid != "": 112 repoDid = *record.Target.RepoDid 113 var lookupErr error 114 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 115 if lookupErr != nil { 116 return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr) 117 } 118 119 case record.Target.Repo != nil: 120 // TODO: get rid of this PDS fetch once all repos have DIDs 121 repoAt, parseErr := syntax.ParseATURI(*record.Target.Repo) 122 if parseErr != nil { 123 return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr) 124 } 125 126 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 127 if resolveErr != nil || ident.Handle.IsInvalidHandle() { 128 return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr) 129 } 130 131 xrpcc := xrpc.Client{ 132 Host: ident.PDSEndpoint(), 133 } 134 135 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 136 if getErr != nil { 137 return nil, fmt.Errorf("failed to resolve repo: %w", getErr) 138 } 139 140 repo := resp.Value.Val.(*tangled.Repo) 141 142 if repo.Knot != h.c.Server.Hostname { 143 return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 144 } 145 146 ownerDid = ident.DID.String() 147 repoName = repo.Name 148 149 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName) 150 if didErr != nil { 151 return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr) 152 } 153 154 var lookupErr error 155 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 156 if lookupErr != nil { 157 return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr) 158 } 159 160 default: 161 return nil, fmt.Errorf("ignoring pull record: target has neither repo nor repoDid") 162 } 163 164 _, err := git.Open(repoPath, record.Source.Branch) 165 if err != nil { 166 return nil, fmt.Errorf("failed to open git repository: %w", err) 167 } 168 169 return &targetRepo{ 170 RepoPath: repoPath, 171 OwnerDid: ownerDid, 172 RepoName: repoName, 173 RepoDid: repoDid, 174 }, nil 175} 176 177func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) { 178 // resolve the PR owner's identity to fetch the blob from their PDS 179 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did) 180 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() { 181 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err) 182 } 183 184 roundNumber := len(record.Rounds) - 1 185 round := record.Rounds[roundNumber] 186 187 // fetch the blob from the PR owner's PDS 188 prOwnerPds := prOwnerIdent.PDSEndpoint() 189 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds)) 190 if err != nil { 191 return nil, fmt.Errorf("failed to construct blob URL: %w", err) 192 } 193 q := blobUrl.Query() 194 q.Set("cid", round.PatchBlob.Ref.String()) 195 q.Set("did", did) 196 blobUrl.RawQuery = q.Encode() 197 198 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil) 199 if err != nil { 200 return nil, fmt.Errorf("failed to create blob request: %w", err) 201 } 202 req.Header.Set("Content-Type", "application/json") 203 204 blobResp, err := http.DefaultClient.Do(req) 205 if err != nil { 206 return nil, fmt.Errorf("failed to fetch blob: %w", err) 207 } 208 defer blobResp.Body.Close() 209 210 blob := io.ReadCloser(blobResp.Body) 211 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob) 212 if err != nil { 213 return nil, fmt.Errorf("failed to parse submission: %w", err) 214 } 215 216 return latestSubmission, nil 217} 218 219func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) { 220 gr, err := git.Open(repoPath, sha) 221 if err != nil { 222 return nil, fmt.Errorf("failed to open git repository: %w", err) 223 } 224 225 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 226 if err != nil { 227 return nil, fmt.Errorf("failed to open workflow directory: %w", err) 228 } 229 230 var pipeline workflow.RawPipeline 231 for _, e := range workflowDir { 232 if !e.IsFile() { 233 continue 234 } 235 236 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 237 contents, err := gr.RawContent(fpath) 238 if err != nil { 239 continue 240 } 241 242 pipeline = append(pipeline, workflow.RawWorkflow{ 243 Name: e.Name, 244 Contents: contents, 245 }) 246 } 247 248 return pipeline, nil 249} 250 251func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline { 252 l := log.FromContext(ctx) 253 254 trigger := tangled.Pipeline_PullRequestTriggerData{ 255 Action: "create", 256 SourceBranch: sourceBranch, 257 SourceSha: sourceSha, 258 TargetBranch: targetBranch, 259 } 260 261 compiler := workflow.Compiler{ 262 Trigger: tangled.Pipeline_TriggerMetadata{ 263 Kind: string(workflow.TriggerKindPullRequest), 264 PullRequest: &trigger, 265 Repo: &tangled.Pipeline_TriggerRepo{ 266 Knot: h.c.Server.Hostname, 267 RepoDid: &targetRepo.RepoDid, 268 Did: targetRepo.OwnerDid, 269 Repo: &targetRepo.RepoName, 270 }, 271 }, 272 } 273 274 l.Info("raw", "raw", rawPipeline) 275 parsed := compiler.Parse(rawPipeline) 276 l.Info("parsed", "parsed", parsed) 277 compiled := compiler.Compile(parsed) 278 279 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics) 280 281 return compiled 282} 283 284func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error { 285 raw := json.RawMessage(event.Commit.Record) 286 rkey := event.Commit.RKey 287 did := event.Did 288 289 var record tangled.RepoPull 290 if err := json.Unmarshal(raw, &record); err != nil { 291 return fmt.Errorf("failed to unmarshal record: %w", err) 292 } 293 294 l := log.FromContext(ctx) 295 l = l.With("handler", "processPull") 296 l = l.With("did", did) 297 298 l.Info("validating pull record") 299 targetRepo, err := h.validatePullRecord(ctx, &record) 300 if err != nil { 301 l.Warn("pull record did not validate, skipping...") 302 return err 303 } 304 305 l = l.With("target_repo", record.Target.Repo) 306 l = l.With("target_branch", record.Target.Branch) 307 308 l.Info("fetching latest submission") 309 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record) 310 if err != nil { 311 return err 312 } 313 314 sha := latestSubmission.SourceRev 315 if sha == "" { 316 return fmt.Errorf("failed to extract source SHA from pull submission") 317 } 318 l = l.With("sha", sha) 319 320 l.Info("discovering workflows", "repo_path", targetRepo.RepoPath) 321 pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha) 322 if err != nil { 323 return err 324 } 325 326 l.Info("compiling pipeline", "workflow_count", len(pipeline)) 327 cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline) 328 329 // do not run empty pipelines 330 if cp.Workflows == nil { 331 l.Info("skipping empty pipeline") 332 return nil 333 } 334 335 l.Info("marshaling pipeline event") 336 eventJson, err := json.Marshal(cp) 337 if err != nil { 338 return fmt.Errorf("failed to marshal pipeline event: %w", err) 339 } 340 341 ev := db.Event{ 342 Rkey: TID(), 343 Nsid: tangled.PipelineNSID, 344 EventJson: string(eventJson), 345 } 346 347 l.Info("inserting pipeline event") 348 return h.db.InsertEvent(ev, h.n) 349} 350 351// duplicated from add collaborator 352func (h *Knot) processCollaborator(ctx context.Context, event *jmodels.Event) error { 353 raw := json.RawMessage(event.Commit.Record) 354 did := event.Did 355 356 var record tangled.RepoCollaborator 357 if err := json.Unmarshal(raw, &record); err != nil { 358 return fmt.Errorf("failed to unmarshal record: %w", err) 359 } 360 361 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject) 362 if err != nil || subjectId.Handle.IsInvalidHandle() { 363 return err 364 } 365 366 var rbacResource string 367 switch { 368 case record.RepoDid != nil && *record.RepoDid != "": 369 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(*record.RepoDid) 370 if lookupErr != nil { 371 return fmt.Errorf("unknown repo DID %s: %w", *record.RepoDid, lookupErr) 372 } 373 if ownerDid != did { 374 return fmt.Errorf("collaborator record author %s does not own repo %s", did, *record.RepoDid) 375 } 376 rbacResource = *record.RepoDid 377 378 case record.Repo != nil: 379 // TODO: get rid of this PDS fetch once all repos have DIDs 380 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 381 if parseErr != nil { 382 return parseErr 383 } 384 385 owner, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 386 if resolveErr != nil || owner.Handle.IsInvalidHandle() { 387 return fmt.Errorf("failed to resolve handle: %w", resolveErr) 388 } 389 390 xrpcc := xrpc.Client{ 391 Host: owner.PDSEndpoint(), 392 } 393 394 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 395 if getErr != nil { 396 return getErr 397 } 398 399 repo := resp.Value.Val.(*tangled.Repo) 400 repoDid, didErr := h.db.GetRepoDid(owner.DID.String(), repo.Name) 401 if didErr != nil { 402 return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", owner.DID.String(), repo.Name, didErr) 403 } 404 rbacResource = repoDid 405 406 default: 407 return fmt.Errorf("collaborator record has neither repo nor repoDid") 408 } 409 410 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, rbacResource) 411 if err != nil { 412 return fmt.Errorf("failed to check permissions: %w", err) 413 } 414 if !ok { 415 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", rbacResource) 416 } 417 418 if err := h.db.AddDid(subjectId.DID.String()); err != nil { 419 return err 420 } 421 h.jc.AddDid(subjectId.DID.String()) 422 423 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, rbacResource); err != nil { 424 return err 425 } 426 427 return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 428} 429 430func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error { 431 l := log.FromContext(ctx) 432 433 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 434 if err != nil { 435 l.Error("error building endpoint url", "did", did, "error", err.Error()) 436 return fmt.Errorf("error building endpoint url: %w", err) 437 } 438 439 resp, err := http.Get(keysEndpoint) 440 if err != nil { 441 l.Error("error getting keys", "did", did, "error", err) 442 return fmt.Errorf("error getting keys: %w", err) 443 } 444 defer resp.Body.Close() 445 446 if resp.StatusCode == http.StatusNotFound { 447 l.Info("no keys found for did", "did", did) 448 return nil 449 } 450 451 plaintext, err := io.ReadAll(resp.Body) 452 if err != nil { 453 l.Error("error reading response body", "error", err) 454 return fmt.Errorf("error reading response body: %w", err) 455 } 456 457 for key := range strings.SplitSeq(string(plaintext), "\n") { 458 if key == "" { 459 continue 460 } 461 pk := db.PublicKey{ 462 Did: did, 463 } 464 pk.Key = key 465 if err := h.db.AddPublicKey(pk); err != nil { 466 l.Error("failed to add public key", "error", err) 467 return fmt.Errorf("failed to add public key: %w", err) 468 } 469 } 470 return nil 471} 472 473func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error { 474 if event.Kind != jmodels.EventKindCommit { 475 return nil 476 } 477 478 var err error 479 switch event.Commit.Collection { 480 case tangled.PublicKeyNSID: 481 err = h.processPublicKey(ctx, event) 482 case tangled.KnotMemberNSID: 483 err = h.processKnotMember(ctx, event) 484 case tangled.RepoPullNSID: 485 err = h.processPull(ctx, event) 486 case tangled.RepoCollaboratorNSID: 487 err = h.processCollaborator(ctx, event) 488 } 489 490 if err != nil { 491 h.l.Warn("failed to process event, skipping", "nsid", event.Commit.Collection, "err", err) 492 } 493 494 lastTimeUs := event.TimeUS + 1 495 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 496 h.l.Error("failed to save cursor", "err", saveErr) 497 } 498 499 return nil 500}