Mirror of @tangled.org/core. Running on a Raspberry Pi Zero 2 (Please be gentle).
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

spindle: create pipeline events from spindle

spindle will emit `sh.tangled.pipeline` event on:
- `sh.tangled.git.refUpdate` events from knot stream
- live create/update events of `sh.tangled.repo.pull` records

Signed-off-by: Seongmin Lee <git@boltless.me>

+281 -74
+1
knotserver/internal.go
··· 176 176 } 177 177 178 178 for _, line := range lines { 179 + // TODO: pass pushOptions to refUpdate 179 180 err := h.insertRefUpdate(line, gitUserDid, repoDid, repoName) 180 181 if err != nil { 181 182 l.Error("failed to insert op", "err", err, "line", line, "did", gitUserDid, "repo", gitRelativeDir)
+3
nix/modules/spindle.nix
··· 136 136 "sh.tangled.repo" 137 137 "sh.tangled.repo.collaborator" 138 138 "sh.tangled.spindle.member" 139 + "sh.tangled.repo.pull" 139 140 ]}" 141 + # temporary hack to listen for repo.pull from non-tangled users 142 + "TAP_SIGNAL_COLLECTION=sh.tangled.repo.pull" 140 143 ]; 141 144 ExecStart = "${getExe cfg.tap-package} run"; 142 145 };
+14
spindle/db/events.go
··· 70 70 return evts, nil 71 71 } 72 72 73 + func (d *DB) CreatePipelineEvent(rkey string, pipeline tangled.Pipeline, n *notifier.Notifier) error { 74 + eventJson, err := json.Marshal(pipeline) 75 + if err != nil { 76 + return err 77 + } 78 + event := Event{ 79 + Rkey: rkey, 80 + Nsid: tangled.PipelineNSID, 81 + Created: time.Now().UnixNano(), 82 + EventJson: string(eventJson), 83 + } 84 + return d.insertEvent(event, n) 85 + } 86 + 73 87 func (d *DB) createStatusEvent( 74 88 workflowId models.WorkflowId, 75 89 statusKind models.StatusKind,
+174 -72
spindle/server.go
··· 4 4 "context" 5 5 _ "embed" 6 6 "encoding/json" 7 + "errors" 7 8 "fmt" 8 9 "log/slog" 9 10 "maps" ··· 14 13 15 14 "github.com/bluesky-social/indigo/atproto/syntax" 16 15 "github.com/go-chi/chi/v5" 16 + "github.com/go-git/go-git/v5/plumbing/object" 17 17 "github.com/hashicorp/go-version" 18 18 "tangled.org/core/api/tangled" 19 19 "tangled.org/core/eventconsumer" 20 20 "tangled.org/core/eventconsumer/cursor" 21 21 "tangled.org/core/idresolver" 22 + kgit "tangled.org/core/knotserver/git" 22 23 "tangled.org/core/log" 23 24 "tangled.org/core/notifier" 24 25 "tangled.org/core/rbac2" ··· 34 31 "tangled.org/core/spindle/secrets" 35 32 "tangled.org/core/spindle/xrpc" 36 33 "tangled.org/core/tap" 34 + "tangled.org/core/tid" 35 + "tangled.org/core/workflow" 37 36 "tangled.org/core/xrpc/serviceauth" 38 37 ) 39 38 ··· 135 130 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 136 131 } 137 132 138 - // for each incoming sh.tangled.pipeline, we execute 139 - // spindle.processPipeline, which in turn enqueues the pipeline 140 - // job in the above registered queue. 133 + // spindle listen to knot stream for sh.tangled.git.refUpdate 134 + // which will sync the local workflow files in spindle and enqueues the 135 + // pipeline job for on-push workflows 141 136 ccfg := eventconsumer.NewConsumerConfig() 142 137 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 143 138 ccfg.Dev = cfg.Server.Dev 144 - ccfg.ProcessFunc = spindle.processPipeline 139 + ccfg.ProcessFunc = spindle.processKnotStream 145 140 ccfg.CursorStore = cursorStore 146 141 knownKnots, err := d.Knots() 147 142 if err != nil { ··· 286 281 return x.Router() 287 282 } 288 283 289 - func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 284 + func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 290 285 l := log.FromContext(ctx).With("handler", "processKnotStream") 291 286 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 292 287 if msg.Nsid == tangled.PipelineNSID { ··· 324 319 Rkey: msg.Rkey, 325 320 } 326 321 327 - workflows := make(map[models.Engine][]models.Workflow) 328 - 329 - // Build pipeline environment variables once for all workflows 330 - pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 331 - 332 - for _, w := range tpl.Workflows { 333 - if w != nil { 334 - if _, ok := s.engs[w.Engine]; !ok { 335 - err = s.db.StatusFailed(models.WorkflowId{ 336 - PipelineId: pipelineId, 337 - Name: w.Name, 338 - }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 339 - if err != nil { 340 - return fmt.Errorf("db.StatusFailed: %w", err) 341 - } 342 - 343 - continue 344 - } 345 - 346 - eng := s.engs[w.Engine] 347 - 348 - if _, ok := workflows[eng]; !ok { 349 - workflows[eng] = []models.Workflow{} 350 - } 351 - 352 - ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 353 - if err != nil { 354 - return fmt.Errorf("init workflow: %w", err) 355 - } 356 - 357 - // inject TANGLED_* env vars after InitWorkflow 358 - // This prevents user-defined env vars from overriding them 359 - if ewf.Environment == nil { 360 - ewf.Environment = make(map[string]string) 361 - } 362 - maps.Copy(ewf.Environment, pipelineEnv) 363 - 364 - workflows[eng] = append(workflows[eng], *ewf) 365 - 366 - err = s.db.StatusPending(models.WorkflowId{ 367 - PipelineId: pipelineId, 368 - Name: w.Name, 369 - }, s.n) 370 - if err != nil { 371 - return fmt.Errorf("db.StatusPending: %w", err) 372 - } 373 - } 374 - } 375 - 376 - ok := s.jq.Enqueue(queue.Job{ 377 - Run: func() error { 378 - engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 379 - RepoOwner: tpl.TriggerMetadata.Repo.Did, 380 - RepoName: tpl.TriggerMetadata.Repo.Repo, 381 - Workflows: workflows, 382 - }, pipelineId) 383 - return nil 384 - }, 385 - OnFail: func(jobError error) { 386 - s.l.Error("pipeline run failed", "error", jobError) 387 - }, 388 - }) 389 - if ok { 390 - s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 391 - } else { 392 - s.l.Error("failed to enqueue pipeline: queue is full") 322 + err = s.processPipeline(ctx, tpl, pipelineId) 323 + if err != nil { 324 + return err 393 325 } 394 326 } else if msg.Nsid == tangled.GitRefUpdateNSID { 395 327 event := tangled.GitRefUpdate{} ··· 351 409 } 352 410 l.Info("synced git repo") 353 411 354 - // TODO: plan the pipeline 412 + compiler := workflow.Compiler{ 413 + Trigger: tangled.Pipeline_TriggerMetadata{ 414 + Kind: string(workflow.TriggerKindPush), 415 + Push: &tangled.Pipeline_PushTriggerData{ 416 + Ref: event.Ref, 417 + OldSha: event.OldSha, 418 + NewSha: event.NewSha, 419 + }, 420 + Repo: &tangled.Pipeline_TriggerRepo{ 421 + Did: repo.Did.String(), 422 + Knot: repo.Knot, 423 + Repo: repo.Name, 424 + }, 425 + }, 426 + } 427 + 428 + // load workflow definitions from rev (without spindle context) 429 + rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha) 430 + if err != nil { 431 + return fmt.Errorf("loading pipeline: %w", err) 432 + } 433 + if len(rawPipeline) == 0 { 434 + l.Info("no workflow definition find for the repo. skipping the event") 435 + return nil 436 + } 437 + tpl := compiler.Compile(compiler.Parse(rawPipeline)) 438 + // TODO: pass compile error to workflow log 439 + for _, w := range compiler.Diagnostics.Errors { 440 + l.Error(w.String()) 441 + } 442 + for _, w := range compiler.Diagnostics.Warnings { 443 + l.Warn(w.String()) 444 + } 445 + 446 + pipelineId := models.PipelineId{ 447 + Knot: tpl.TriggerMetadata.Repo.Knot, 448 + Rkey: tid.TID(), 449 + } 450 + if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 451 + l.Error("failed to create pipeline event", "err", err) 452 + return nil 453 + } 454 + err = s.processPipeline(ctx, tpl, pipelineId) 455 + if err != nil { 456 + return err 457 + } 355 458 } 356 459 460 + return nil 461 + } 462 + 463 + func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) { 464 + if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil { 465 + return nil, fmt.Errorf("syncing git repo: %w", err) 466 + } 467 + gr, err := kgit.Open(repoPath, rev) 468 + if err != nil { 469 + return nil, fmt.Errorf("opening git repo: %w", err) 470 + } 471 + 472 + workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 473 + if errors.Is(err, object.ErrDirectoryNotFound) { 474 + // return empty RawPipeline when directory doesn't exist 475 + return nil, nil 476 + } else if err != nil { 477 + return nil, fmt.Errorf("loading file tree: %w", err) 478 + } 479 + 480 + var rawPipeline workflow.RawPipeline 481 + for _, e := range workflowDir { 482 + if !e.IsFile() { 483 + continue 484 + } 485 + 486 + fpath := filepath.Join(workflow.WorkflowDir, e.Name) 487 + contents, err := gr.RawContent(fpath) 488 + if err != nil { 489 + return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err) 490 + } 491 + 492 + rawPipeline = append(rawPipeline, workflow.RawWorkflow{ 493 + Name: e.Name, 494 + Contents: contents, 495 + }) 496 + } 497 + 498 + return rawPipeline, nil 499 + } 500 + 501 + func (s *Spindle) processPipeline(ctx context.Context, tpl tangled.Pipeline, pipelineId models.PipelineId) error { 502 + // Build pipeline environment variables once for all workflows 503 + pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 504 + 505 + // filter & init workflows 506 + workflows := make(map[models.Engine][]models.Workflow) 507 + for _, w := range tpl.Workflows { 508 + if w == nil { 509 + continue 510 + } 511 + if _, ok := s.engs[w.Engine]; !ok { 512 + err := s.db.StatusFailed(models.WorkflowId{ 513 + PipelineId: pipelineId, 514 + Name: w.Name, 515 + }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 516 + if err != nil { 517 + return fmt.Errorf("db.StatusFailed: %w", err) 518 + } 519 + 520 + continue 521 + } 522 + 523 + eng := s.engs[w.Engine] 524 + 525 + if _, ok := workflows[eng]; !ok { 526 + workflows[eng] = []models.Workflow{} 527 + } 528 + 529 + ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 530 + if err != nil { 531 + return fmt.Errorf("init workflow: %w", err) 532 + } 533 + 534 + // inject TANGLED_* env vars after InitWorkflow 535 + // This prevents user-defined env vars from overriding them 536 + if ewf.Environment == nil { 537 + ewf.Environment = make(map[string]string) 538 + } 539 + maps.Copy(ewf.Environment, pipelineEnv) 540 + 541 + workflows[eng] = append(workflows[eng], *ewf) 542 + } 543 + 544 + // enqueue pipeline 545 + ok := s.jq.Enqueue(queue.Job{ 546 + Run: func() error { 547 + engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 548 + RepoOwner: tpl.TriggerMetadata.Repo.Did, 549 + RepoName: tpl.TriggerMetadata.Repo.Repo, 550 + Workflows: workflows, 551 + }, pipelineId) 552 + return nil 553 + }, 554 + OnFail: func(jobError error) { 555 + s.l.Error("pipeline run failed", "error", jobError) 556 + }, 557 + }) 558 + if !ok { 559 + return fmt.Errorf("failed to enqueue pipeline: queue is full") 560 + } 561 + s.l.Info("pipeline enqueued successfully", "id", pipelineId) 562 + 563 + // emit StatusPending for all workflows here (after successful enqueue) 564 + for _, ewfs := range workflows { 565 + for _, ewf := range ewfs { 566 + err := s.db.StatusPending(models.WorkflowId{ 567 + PipelineId: pipelineId, 568 + Name: ewf.Name, 569 + }, s.n) 570 + if err != nil { 571 + return fmt.Errorf("db.StatusPending: %w", err) 572 + } 573 + } 574 + } 357 575 return nil 358 576 } 359 577
+89 -2
spindle/tap.go
··· 11 11 "tangled.org/core/eventconsumer" 12 12 "tangled.org/core/spindle/db" 13 13 "tangled.org/core/spindle/git" 14 + "tangled.org/core/spindle/models" 14 15 "tangled.org/core/tap" 16 + "tangled.org/core/tid" 17 + "tangled.org/core/workflow" 15 18 ) 16 19 17 20 func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error { ··· 284 281 285 282 l.Info("processing pull record") 286 283 284 + // only listen to live events 285 + if !evt.Record.Live { 286 + l.Info("skipping backfill event", "event", evt.Record.AtUri()) 287 + return nil 288 + } 289 + 287 290 switch evt.Record.Action { 288 291 case tap.RecordCreateAction, tap.RecordUpdateAction: 289 - // TODO 292 + record := tangled.RepoPull{} 293 + if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 294 + l.Error("invalid record", "err", err) 295 + return fmt.Errorf("parsing record: %w", err) 296 + } 297 + 298 + // ignore legacy records 299 + if record.Target == nil { 300 + l.Info("ignoring pull record: target repo is nil") 301 + return nil 302 + } 303 + 304 + // ignore patch-based and fork-based PRs 305 + if record.Source == nil || record.Source.Repo != nil { 306 + l.Info("ignoring pull record: not a branch-based pull request") 307 + return nil 308 + } 309 + 310 + // skip if target repo is unknown 311 + repo, err := s.db.GetRepo(syntax.ATURI(record.Target.Repo)) 312 + if err != nil { 313 + l.Warn("target repo is not ingested yet", "repo", record.Target.Repo, "err", err) 314 + return fmt.Errorf("target repo is unknown") 315 + } 316 + 317 + compiler := workflow.Compiler{ 318 + Trigger: tangled.Pipeline_TriggerMetadata{ 319 + Kind: string(workflow.TriggerKindPullRequest), 320 + PullRequest: &tangled.Pipeline_PullRequestTriggerData{ 321 + Action: "create", 322 + SourceBranch: record.Source.Branch, 323 + SourceSha: record.Source.Sha, 324 + TargetBranch: record.Target.Branch, 325 + }, 326 + Repo: &tangled.Pipeline_TriggerRepo{ 327 + Did: repo.Did.String(), 328 + Knot: repo.Knot, 329 + Repo: repo.Name, 330 + }, 331 + }, 332 + } 333 + 334 + repoUri := s.newRepoCloneUrl(repo.Knot, repo.Did.String(), repo.Name) 335 + repoPath := s.newRepoPath(repo.Did, repo.Rkey) 336 + 337 + // load workflow definitions from rev (without spindle context) 338 + rawPipeline, err := s.loadPipeline(ctx, repoUri, repoPath, record.Source.Sha) 339 + if err != nil { 340 + // don't retry 341 + l.Error("failed loading pipeline", "err", err) 342 + return nil 343 + } 344 + if len(rawPipeline) == 0 { 345 + l.Info("no workflow definition find for the repo. skipping the event") 346 + return nil 347 + } 348 + tpl := compiler.Compile(compiler.Parse(rawPipeline)) 349 + // TODO: pass compile error to workflow log 350 + for _, w := range compiler.Diagnostics.Errors { 351 + l.Error(w.String()) 352 + } 353 + for _, w := range compiler.Diagnostics.Warnings { 354 + l.Warn(w.String()) 355 + } 356 + 357 + pipelineId := models.PipelineId{ 358 + Knot: tpl.TriggerMetadata.Repo.Knot, 359 + Rkey: tid.TID(), 360 + } 361 + if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 362 + l.Error("failed to create pipeline event", "err", err) 363 + return nil 364 + } 365 + err = s.processPipeline(ctx, tpl, pipelineId) 366 + if err != nil { 367 + // don't retry 368 + l.Error("failed processing pipeline", "err", err) 369 + return nil 370 + } 290 371 case tap.RecordDeleteAction: 291 - // TODO 372 + // no-op 292 373 } 293 374 return nil 294 375 }