···11# Build both binaries
22# Use BUILDPLATFORM so Go runs natively, cross-compile for target arch
33-FROM --platform=$BUILDPLATFORM golang:1.25 AS builder
33+FROM --platform=$BUILDPLATFORM golang:1.25-trixie AS builder
4455ARG TARGETOS
66ARG TARGETARCH
···4040 go build -a -ldflags='-s -w' -o manager ./cmd/controller
41414242# Unified image with both binaries
4343-FROM gcr.io/distroless/base-debian12:nonroot
4343+FROM gcr.io/distroless/base-debian13:nonroot
4444COPY --from=builder /workspace/loom/manager /manager
4545COPY --from=builder /workspace/loom/loom-runner /loom-runner
4646
+10-23
Makefile
···33# To re-generate a bundle for another specific version without changing the standard setup, you can:
44# - use the VERSION as arg of the bundle target (e.g make bundle VERSION=0.0.2)
55# - use environment variables to overwrite this value (e.g export VERSION=0.0.2)
66-VERSION ?= 0.0.1
66+VERSION ?= 0.1.5
7788# CHANNELS define the bundle channels used in the bundle.
99# Add a new line here if you would like to change its default config. (E.g CHANNELS = "candidate,fast,stable")
···5050# This is useful for CI or a project to utilize a specific version of the operator-sdk toolkit.
5151OPERATOR_SDK_VERSION ?= v1.41.1
5252# Image URL to use all building/pushing image targets
5353-IMG ?= atcr.io/evan.jarrett.net/loom:latest
5353+IMG ?= buoy.cr/evan.jarrett.net/loom:latest
54545555# Get the currently used golang install path (in GOPATH/bin, unless GOBIN is set)
5656ifeq (,$(shell go env GOBIN))
···9999.PHONY: generate
100100generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations.
101101 $(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./..."
102102+103103+.PHONY: proto
104104+proto: ## Generate protobuf and gRPC code.
105105+ buf generate
102106103107.PHONY: fmt
104108fmt: ## Run go fmt against code.
···211215212216.PHONY: test-registry-auth
213217test-registry-auth: ## Test registry authentication before building
214214- @echo "Testing registry authentication for atcr.io..."
218218+ @echo "Testing registry authentication for buoy.cr..."
215219 @if [ -f /usr/local/sbin/docker-credential-atcr ]; then \
216220 echo "Testing credential helper..."; \
217217- echo "atcr.io" | docker-credential-atcr get && echo "✓ Credential helper working!" || echo "✗ Credential helper failed"; \
221221+ echo "buoy.cr" | docker-credential-atcr get && echo "✓ Credential helper working!" || echo "✗ Credential helper failed"; \
218222 else \
219223 echo "⚠ Credential helper not found at /usr/local/sbin/docker-credential-atcr"; \
220224 fi
···222226 @echo "Testing Docker config..."
223227 @if [ -f $(HOME)/.docker/config.json ]; then \
224228 echo "✓ Docker config exists at $(HOME)/.docker/config.json"; \
225225- cat $(HOME)/.docker/config.json | grep -q "atcr.io" && echo "✓ atcr.io found in config" || echo "⚠ atcr.io not found in config"; \
229229+ cat $(HOME)/.docker/config.json | grep -q "buoy.cr" && echo "✓ buoy.cr found in config" || echo "⚠ buoy.cr not found in config"; \
226230 else \
227231 echo "✗ Docker config not found"; \
228232 fi
229233 @echo ""
230234 @echo "Testing registry access with docker pull (this will fail if auth is broken)..."
231231- @$(CONTAINER_TOOL) pull atcr.io/evan.jarrett.net/loom-runner:latest 2>/dev/null && echo "✓ Can pull from registry!" || echo "⚠ Cannot pull from registry (may not exist yet)"
232232-233233-# PLATFORMS defines the target platforms for the manager image be built to provide support to multiple
234234-# architectures. (i.e. make docker-buildx IMG=myregistry/mypoperator:0.0.1). To use this option you need to:
235235-# - be able to use docker buildx. More info: https://docs.docker.com/build/buildx/
236236-# - have enabled BuildKit. More info: https://docs.docker.com/develop/develop-images/build_enhancements/
237237-# - be able to push the image to your registry (i.e. if you do not set a valid value via IMG=<myregistry/image:<tag>> then the export will fail)
238238-# To adequately provide solutions that are compatible with multiple platforms, you should consider using this option.
239239-PLATFORMS ?= linux/arm64,linux/amd64,linux/s390x,linux/ppc64le
240240-.PHONY: docker-buildx
241241-docker-buildx: ## Build and push docker image for the manager for cross-platform support
242242- # copy existing Dockerfile and insert --platform=${BUILDPLATFORM} into Dockerfile.cross, and preserve the original Dockerfile
243243- sed -e '1 s/\(^FROM\)/FROM --platform=\$$\{BUILDPLATFORM\}/; t' -e ' 1,// s//FROM --platform=\$$\{BUILDPLATFORM\}/' Dockerfile > Dockerfile.cross
244244- - $(CONTAINER_TOOL) buildx create --name loom-builder
245245- $(CONTAINER_TOOL) buildx use loom-builder
246246- - cd .. && $(CONTAINER_TOOL) buildx build --push --platform=$(PLATFORMS) --tag ${IMG} -f loom/Dockerfile.cross .
247247- - $(CONTAINER_TOOL) buildx rm loom-builder
248248- rm Dockerfile.cross
235235+ @$(CONTAINER_TOOL) pull buoy.cr/evan.jarrett.net/loom-runner:latest 2>/dev/null && echo "✓ Can pull from registry!" || echo "⚠ Cannot pull from registry (may not exist yet)"
249236250237.PHONY: build-installer
251238build-installer: manifests generate kustomize ## Generate a consolidated YAML with CRDs and deployment.
+69
api/v1alpha1/spindleset_types.go
···6161 Secrets []SecretData `json:"secrets,omitempty"`
62626363 // Workflows is the list of workflows to execute in this pipeline.
6464+ // For multi-arch workflows, this contains one entry per matrix leg plus an optional final entry.
6465 // +kubebuilder:validation:MinItems=1
6566 Workflows []WorkflowSpec `json:"workflows"`
6767+6868+ // MultiArch indicates this pipeline run contains multi-arch workflows.
6969+ // When true, the controller creates per-architecture Jobs and gates the final Job.
7070+ // +optional
7171+ MultiArch bool `json:"multiArch,omitempty"`
6672}
67736874// SecretData represents a single secret key-value pair for injection into Jobs.
···81878288// WorkflowSpec defines a workflow to execute as part of a pipeline.
8389// This is the canonical workflow definition that matches the .tangled/workflows/*.yaml format.
9090+// For multi-arch workflows, the engine expands the matrix and creates one WorkflowSpec per leg.
9191+// Each leg has a single Image and Architecture; the matrix metadata lives in PipelineRunSpec.
8492type WorkflowSpec struct {
8593 // Name is the workflow filename (e.g., "workflow-amd64.yaml").
8694 // +kubebuilder:validation:Required
···110118 // Dependencies specifies external dependencies for the workflow.
111119 // +optional
112120 Dependencies *WorkflowDependencies `json:"dependencies,omitempty"`
121121+122122+ // Final defines steps that run once after all matrix legs complete.
123123+ // Only valid on multi-arch workflows. The engine sets this on the dedicated final WorkflowSpec.
124124+ // +optional
125125+ Final *FinalSpec `json:"final,omitempty"`
126126+127127+ // IsMatrixLeg indicates this WorkflowSpec was generated from a matrix expansion.
128128+ // +optional
129129+ IsMatrixLeg bool `json:"isMatrixLeg,omitempty"`
130130+131131+ // IsFinal indicates this WorkflowSpec represents the final step of a multi-arch workflow.
132132+ // +optional
133133+ IsFinal bool `json:"isFinal,omitempty"`
134134+}
135135+136136+// FinalSpec defines steps that run once after all matrix legs complete successfully.
137137+type FinalSpec struct {
138138+ // Architecture is the target architecture for the final steps.
139139+ // +kubebuilder:validation:Required
140140+ // +kubebuilder:validation:Enum=amd64;arm64
141141+ Architecture string `json:"architecture"`
142142+143143+ // Image is the container image for the final steps.
144144+ // If empty, uses the first image from the matrix.
145145+ // +optional
146146+ Image string `json:"image,omitempty"`
147147+148148+ // Steps is the ordered list of steps to execute after all matrix legs complete.
149149+ // +kubebuilder:validation:MinItems=1
150150+ Steps []WorkflowStep `json:"steps"`
113151}
114152115153// WorkflowStep defines a single step in a workflow.
···224262 Name string `json:"name"`
225263226264 // JobName is the name of the Kubernetes Job created for this workflow.
265265+ // For multi-arch workflows, this is empty; use MatrixLegStatuses instead.
227266 // +optional
228267 JobName string `json:"jobName,omitempty"`
229268···238277 // CompletionTime is when the workflow finished.
239278 // +optional
240279 CompletionTime *metav1.Time `json:"completionTime,omitempty"`
280280+281281+ // MatrixLegStatuses tracks per-architecture Job statuses for multi-arch workflows.
282282+ // +optional
283283+ MatrixLegStatuses []MatrixLegStatus `json:"matrixLegStatuses,omitempty"`
284284+285285+ // FinalJobName is the name of the final Job (for multi-arch workflows).
286286+ // +optional
287287+ FinalJobName string `json:"finalJobName,omitempty"`
288288+289289+ // FinalPhase is the phase of the final Job.
290290+ // +optional
291291+ FinalPhase string `json:"finalPhase,omitempty"`
292292+}
293293+294294+// MatrixLegStatus tracks the status of a single matrix leg Job.
295295+type MatrixLegStatus struct {
296296+ // Architecture is the target architecture for this leg.
297297+ Architecture string `json:"architecture"`
298298+299299+ // Image is the container image used for this leg.
300300+ // +optional
301301+ Image string `json:"image,omitempty"`
302302+303303+ // JobName is the name of the Kubernetes Job for this leg.
304304+ // +optional
305305+ JobName string `json:"jobName,omitempty"`
306306+307307+ // Phase is the current phase (Pending, Running, Succeeded, Failed).
308308+ // +optional
309309+ Phase string `json:"phase,omitempty"`
241310}
242311243312// +kubebuilder:object:root=true
+47
api/v1alpha1/zz_generated.deepcopy.go
···2727)
28282929// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
3030+func (in *FinalSpec) DeepCopyInto(out *FinalSpec) {
3131+ *out = *in
3232+ if in.Steps != nil {
3333+ in, out := &in.Steps, &out.Steps
3434+ *out = make([]WorkflowStep, len(*in))
3535+ for i := range *in {
3636+ (*in)[i].DeepCopyInto(&(*out)[i])
3737+ }
3838+ }
3939+}
4040+4141+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FinalSpec.
4242+func (in *FinalSpec) DeepCopy() *FinalSpec {
4343+ if in == nil {
4444+ return nil
4545+ }
4646+ out := new(FinalSpec)
4747+ in.DeepCopyInto(out)
4848+ return out
4949+}
5050+5151+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
5252+func (in *MatrixLegStatus) DeepCopyInto(out *MatrixLegStatus) {
5353+ *out = *in
5454+}
5555+5656+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MatrixLegStatus.
5757+func (in *MatrixLegStatus) DeepCopy() *MatrixLegStatus {
5858+ if in == nil {
5959+ return nil
6060+ }
6161+ out := new(MatrixLegStatus)
6262+ in.DeepCopyInto(out)
6363+ return out
6464+}
6565+6666+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
3067func (in *PipelineRunSpec) DeepCopyInto(out *PipelineRunSpec) {
3168 *out = *in
3269 if in.CloneCommands != nil {
···288325 *out = new(WorkflowDependencies)
289326 (*in).DeepCopyInto(*out)
290327 }
328328+ if in.Final != nil {
329329+ in, out := &in.Final, &out.Final
330330+ *out = new(FinalSpec)
331331+ (*in).DeepCopyInto(*out)
332332+ }
291333}
292334293335// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowSpec.
···310352 if in.CompletionTime != nil {
311353 in, out := &in.CompletionTime, &out.CompletionTime
312354 *out = (*in).DeepCopy()
355355+ }
356356+ if in.MatrixLegStatuses != nil {
357357+ in, out := &in.MatrixLegStatuses, &out.MatrixLegStatuses
358358+ *out = make([]MatrixLegStatus, len(*in))
359359+ copy(*out, *in)
313360 }
314361}
315362
···11+version: v2
22+modules:
33+ - path: proto
44+lint:
55+ use:
66+ - STANDARD
77+breaking:
88+ use:
99+ - FILE
+83-10
cmd/controller/main.go
···2222 _ "embed"
2323 "flag"
2424 "fmt"
2525+ "net"
2526 "os"
2627 "path/filepath"
2728···3738 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3839 ctrl "sigs.k8s.io/controller-runtime"
3940 "sigs.k8s.io/controller-runtime/pkg/certwatcher"
4141+ "sigs.k8s.io/controller-runtime/pkg/client"
4042 "sigs.k8s.io/controller-runtime/pkg/healthz"
4143 "sigs.k8s.io/controller-runtime/pkg/log/zap"
4244 "sigs.k8s.io/controller-runtime/pkg/metrics/filters"
···5052 loomv1alpha1 "tangled.org/evan.jarrett.net/loom/api/v1alpha1"
5153 "tangled.org/evan.jarrett.net/loom/internal/controller"
5254 "tangled.org/evan.jarrett.net/loom/internal/engine"
5555+ loomgrpc "tangled.org/evan.jarrett.net/loom/internal/grpc"
5356 // +kubebuilder:scaffold:imports
5457)
5558···177180178181// initializeSpindle creates a spindle server with KubernetesEngine
179182func initializeSpindle(
180180- ctx context.Context, cfg *config.Config, mgr ctrl.Manager, loomCfg *LoomConfig,
183183+ ctx context.Context, cfg *config.Config, mgr ctrl.Manager, loomCfg *LoomConfig, hub *loomgrpc.Hub, artifacts *loomgrpc.ArtifactStore,
181184) (*spindle.Spindle, error) {
182185 // Initialize Kubernetes engine
183186 // Get namespace from environment (injected via Downward API)
···203206 return nil, fmt.Errorf("failed to create spindle: %w", err)
204207 }
205208206206- // Now create kubernetes engine with access to vault
207207- kubeEngine := engine.NewKubernetesEngine(mgr.GetClient(), mgr.GetConfig(), namespace, template, s.Vault())
209209+ // Now create kubernetes engine with access to vault and gRPC hub
210210+ kubeEngine := engine.NewKubernetesEngine(mgr.GetClient(), mgr.GetConfig(), namespace, template, s.Vault(), hub, artifacts)
208211209212 // Register the engine with spindle by adding to the engines map
210213 s.Engines()["kubernetes"] = kubeEngine
···385388 os.Exit(1)
386389 }
387390391391+ // Create gRPC hub for runner communication
392392+ hub := loomgrpc.NewHub()
393393+394394+ // Create artifact store for pipeline artifacts (scratch directory)
395395+ artifactDir := "/scratch/artifacts"
396396+ if dir := os.Getenv("LOOM_ARTIFACT_DIR"); dir != "" {
397397+ artifactDir = dir
398398+ }
399399+ artifactStore, err := loomgrpc.NewArtifactStore(artifactDir)
400400+ if err != nil {
401401+ setupLog.Error(err, "failed to create artifact store")
402402+ os.Exit(1)
403403+ }
404404+388405 // Initialize spindle server with KubernetesEngine
389389- s, err := initializeSpindle(ctx, spindleCfg, mgr, loomCfg)
406406+ s, err := initializeSpindle(ctx, spindleCfg, mgr, loomCfg, hub, artifactStore)
390407 if err != nil {
391408 setupLog.Error(err, "failed to initialize spindle")
392409 os.Exit(1)
···396413397414 setupLog.Info("spindle server initialized successfully")
398415416416+ // Start gRPC server for runner communication
417417+ grpcAddr := ":9090"
418418+ if addr := os.Getenv("LOOM_GRPC_ADDR"); addr != "" {
419419+ grpcAddr = addr
420420+ }
421421+422422+ grpcServer := loomgrpc.NewServer(hub, artifactStore)
423423+ go func() {
424424+ lis, err := net.Listen("tcp", grpcAddr)
425425+ if err != nil {
426426+ setupLog.Error(err, "failed to listen for gRPC", "address", grpcAddr)
427427+ os.Exit(1)
428428+ }
429429+ setupLog.Info("starting gRPC server", "address", grpcAddr)
430430+ if err := grpcServer.Serve(lis); err != nil {
431431+ setupLog.Error(err, "gRPC server error")
432432+ }
433433+ }()
434434+ defer grpcServer.GracefulStop()
435435+399436 // Start spindle HTTP server in background
400437 go func() {
401438 setupLog.Info("starting spindle HTTP server", "address", spindleCfg.Server.ListenAddr)
···407444 // Get loom image from environment (used for runner init container)
408445 loomImage := os.Getenv("LOOM_IMAGE")
409446 if loomImage == "" {
410410- loomImage = "atcr.io/evan.jarrett.net/loom:latest" // default fallback
447447+ loomImage = "buoy.cr/evan.jarrett.net/loom:latest" // default fallback
448448+ }
449449+450450+ // Discover the gRPC service address that runner pods will use to reach the operator.
451451+ podNamespace := os.Getenv("POD_NAMESPACE")
452452+ if podNamespace == "" {
453453+ podNamespace = "default"
454454+ }
455455+ operatorAddr := os.Getenv("LOOM_OPERATOR_ADDR")
456456+ if operatorAddr == "" {
457457+ // Find the gRPC service by label in our namespace
458458+ var services corev1.ServiceList
459459+ if err := mgr.GetAPIReader().List(context.Background(), &services,
460460+ client.InNamespace(podNamespace),
461461+ client.MatchingLabels{
462462+ "app.kubernetes.io/name": "loom",
463463+ "app.kubernetes.io/component": "grpc",
464464+ },
465465+ ); err != nil {
466466+ setupLog.Error(err, "failed to discover gRPC service")
467467+ os.Exit(1)
468468+ }
469469+ if len(services.Items) == 0 {
470470+ setupLog.Error(nil, "no gRPC service found with label app.kubernetes.io/component=grpc")
471471+ os.Exit(1)
472472+ }
473473+ svc := services.Items[0]
474474+ grpcPort := int32(9090)
475475+ for _, p := range svc.Spec.Ports {
476476+ if p.Name == "grpc" {
477477+ grpcPort = p.Port
478478+ break
479479+ }
480480+ }
481481+ operatorAddr = fmt.Sprintf("%s.%s.svc.cluster.local:%d", svc.Name, podNamespace, grpcPort)
482482+ setupLog.Info("discovered gRPC service", "address", operatorAddr)
411483 }
412484413485 // Setup controller with spindle components
414486 if err := (&controller.SpindleSetReconciler{
415415- Client: mgr.GetClient(),
416416- Scheme: mgr.GetScheme(),
417417- Config: mgr.GetConfig(),
418418- Spindle: s,
419419- LoomImage: loomImage,
487487+ Client: mgr.GetClient(),
488488+ Scheme: mgr.GetScheme(),
489489+ Config: mgr.GetConfig(),
490490+ Spindle: s,
491491+ LoomImage: loomImage,
492492+ OperatorAddr: operatorAddr,
420493 }).SetupWithManager(mgr); err != nil {
421494 setupLog.Error(err, "unable to create controller", "controller", "SpindleSet")
422495 os.Exit(1)
+254-45
cmd/runner/main.go
···88 "io"
99 "os"
1010 "os/exec"
1111+ "path/filepath"
1212+1313+ "google.golang.org/grpc"
1414+ "google.golang.org/grpc/credentials/insecure"
11151216 "tangled.org/core/spindle/models"
1317 loomv1alpha1 "tangled.org/evan.jarrett.net/loom/api/v1alpha1"
1818+ pb "tangled.org/evan.jarrett.net/loom/internal/pb/loom/v1"
1419)
15201621// simpleStep implements the models.Step interface
···1924 command string
2025}
21262222-// extendedLogLine extends models.LogLine with exit code for error reporting
2323-type extendedLogLine struct {
2424- models.LogLine
2525- ExitCode int `json:"exit_code,omitempty"`
2727+func (s *simpleStep) Name() string { return s.name }
2828+func (s *simpleStep) Command() string { return s.command }
2929+func (s *simpleStep) Kind() models.StepKind {
3030+ return models.StepKindUser
2631}
27322828-func (s *simpleStep) Name() string {
2929- return s.name
3333+// grpcEmitter sends events to the operator over gRPC and also writes to stdout.
3434+type grpcEmitter struct {
3535+ stream grpc.BidiStreamingClient[pb.ConnectRequest, pb.ConnectResponse]
3036}
31373232-func (s *simpleStep) Command() string {
3333- return s.command
3838+func (e *grpcEmitter) sendStepControl(stepID int, status string, exitCode int) {
3939+ if e.stream != nil {
4040+ _ = e.stream.Send(&pb.ConnectRequest{
4141+ Event: &pb.ConnectRequest_StepControl{
4242+ StepControl: &pb.StepControl{
4343+ StepId: int32(stepID),
4444+ Status: status,
4545+ ExitCode: int32(exitCode),
4646+ },
4747+ },
4848+ })
4949+ }
3450}
35513636-func (s *simpleStep) Kind() models.StepKind {
3737- return models.StepKindUser
5252+func (e *grpcEmitter) sendLogLine(stepID int, streamName, content string) {
5353+ if e.stream != nil {
5454+ _ = e.stream.Send(&pb.ConnectRequest{
5555+ Event: &pb.ConnectRequest_LogLine{
5656+ LogLine: &pb.LogLine{
5757+ StepId: int32(stepID),
5858+ Stream: streamName,
5959+ Content: content,
6060+ },
6161+ },
6262+ })
6363+ }
3864}
39654066func main() {
···76102 return fmt.Errorf("failed to copy: %w", err)
77103 }
781047979- // Make executable
80105 if err := os.Chmod(dst, 0755); err != nil {
81106 return fmt.Errorf("failed to chmod: %w", err)
82107 }
···96121 return fmt.Errorf("failed to parse workflow spec: %w", err)
97122 }
98123124124+ // Connect to operator via gRPC
125125+ emitter, cleanup, err := connectToOperator(workflow)
126126+ if err != nil {
127127+ // gRPC connection failure is fatal — the operator won't see our events
128128+ return fmt.Errorf("failed to connect to operator: %w", err)
129129+ }
130130+ defer cleanup()
131131+99132 // Set up environment variables
100133 if workflow.Environment != nil {
101134 for k, v := range workflow.Environment {
···105138 }
106139 }
107140141141+ // Create artifacts directory so user steps can write to it (non-root container
142142+ // can't create /artifacts at root itself). Safe to create even when unused.
143143+ artifactsDir := os.Getenv("LOOM_ARTIFACTS")
144144+ if artifactsDir != "" {
145145+ if err := os.MkdirAll(artifactsDir, 0755); err != nil {
146146+ return fmt.Errorf("failed to create artifacts directory: %w", err)
147147+ }
148148+ }
149149+150150+ // For final jobs, download artifacts from matrix legs before executing steps
151151+ if os.Getenv("LOOM_FINAL") == "true" && artifactsDir != "" {
152152+ fmt.Fprintf(os.Stderr, "downloading artifacts from matrix legs...\n")
153153+ if err := downloadArtifacts(emitter, artifactsDir); err != nil {
154154+ return fmt.Errorf("failed to download artifacts: %w", err)
155155+ }
156156+ fmt.Fprintf(os.Stderr, "artifacts downloaded to %s\n", artifactsDir)
157157+ }
158158+108159 // Execute each step
109160 ctx := context.Background()
110161 for i, step := range workflow.Steps {
111111- if err := executeStep(ctx, i, step); err != nil {
162162+ if err := executeStep(ctx, i, step, emitter); err != nil {
112163 return fmt.Errorf("step %d (%s) failed: %w", i, step.Name, err)
113164 }
114165 }
115166167167+ // Upload artifacts if this is a matrix leg
168168+ if os.Getenv("LOOM_MATRIX_LEG") == "true" && artifactsDir != "" {
169169+ if err := uploadArtifacts(emitter, artifactsDir); err != nil {
170170+ fmt.Fprintf(os.Stderr, "WARNING: failed to upload artifacts: %v\n", err)
171171+ }
172172+ }
173173+116174 return nil
117175}
118176119119-func executeStep(ctx context.Context, stepID int, step loomv1alpha1.WorkflowStep) error {
120120- // Create a simple step for logging
121121- simpleStep := &simpleStep{
122122- name: step.Name,
123123- command: step.Command,
177177+// connectToOperator establishes the gRPC connection and sends the identity message.
178178+func connectToOperator(workflow loomv1alpha1.WorkflowSpec) (*grpcEmitter, func(), error) {
179179+ addr := os.Getenv("LOOM_OPERATOR_ADDR")
180180+ if addr == "" {
181181+ return nil, nil, fmt.Errorf("LOOM_OPERATOR_ADDR environment variable not set")
182182+ }
183183+184184+ conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
185185+ if err != nil {
186186+ return nil, nil, fmt.Errorf("failed to create gRPC client: %w", err)
187187+ }
188188+189189+ client := pb.NewLoomRunnerServiceClient(conn)
190190+ stream, err := client.Connect(context.Background())
191191+ if err != nil {
192192+ conn.Close()
193193+ return nil, nil, fmt.Errorf("failed to open gRPC stream: %w", err)
194194+ }
195195+196196+ // Send identity message
197197+ pipelineID := os.Getenv("TANGLED_PIPELINE_ID")
198198+ if pipelineID == "" {
199199+ pipelineID = workflow.Environment["TANGLED_PIPELINE_ID"]
200200+ }
201201+202202+ if err := stream.Send(&pb.ConnectRequest{
203203+ PipelineId: pipelineID,
204204+ WorkflowName: workflow.Name,
205205+ Architecture: workflow.Architecture,
206206+ }); err != nil {
207207+ conn.Close()
208208+ return nil, nil, fmt.Errorf("failed to send identity: %w", err)
209209+ }
210210+211211+ emitter := &grpcEmitter{stream: stream}
212212+ cleanup := func() {
213213+ _ = stream.CloseSend()
214214+ conn.Close()
124215 }
125216126126- // Emit step start event
127127- emitControlEvent(stepID, simpleStep, models.StepStatusStart)
217217+ fmt.Fprintf(os.Stderr, "connected to operator at %s\n", addr)
218218+ return emitter, cleanup, nil
219219+}
220220+221221+func executeStep(ctx context.Context, stepID int, step loomv1alpha1.WorkflowStep, emitter *grpcEmitter) error {
222222+ // Emit step start — gRPC + stdout
223223+ emitter.sendStepControl(stepID, "start", 0)
224224+ emitStdoutControl(stepID, &simpleStep{name: step.Name, command: step.Command}, models.StepStatusStart)
128225129226 // Set step-specific environment variables
130227 if step.Environment != nil {
···136233 }
137234138235 // Create command that auto-sources LOOM_ENV if it exists
139139- // Users can write "VAR=value" to this file to share env vars between steps
140236 wrappedCommand := `if [ -f "$LOOM_ENV" ]; then set -a; source "$LOOM_ENV"; set +a; fi; ` + step.Command
141237 cmd := exec.CommandContext(ctx, "bash", "-c", wrappedCommand)
142238 cmd.Dir = "/tangled/workspace"
143239 cmd.Env = append(os.Environ(), "LOOM_ENV=/tangled/workspace/.loom-env")
144240145145- // Capture stdout and stderr
146241 stdout, err := cmd.StdoutPipe()
147242 if err != nil {
148243 return fmt.Errorf("failed to create stdout pipe: %w", err)
···153248 return fmt.Errorf("failed to create stderr pipe: %w", err)
154249 }
155250156156- // Start the command
157251 if err := cmd.Start(); err != nil {
158158- emitControlEvent(stepID, simpleStep, models.StepStatusEnd)
252252+ emitter.sendStepControl(stepID, "end", 1)
159253 return fmt.Errorf("failed to start command: %w", err)
160254 }
161255162256 // Stream stdout and stderr concurrently
163257 done := make(chan error, 2)
164164- go streamOutput(stdout, stepID, "stdout", done)
165165- go streamOutput(stderr, stepID, "stderr", done)
258258+ go streamOutput(stdout, stepID, "stdout", emitter, done)
259259+ go streamOutput(stderr, stepID, "stderr", emitter, done)
166260167167- // Wait for both streams to complete
168261 for i := 0; i < 2; i++ {
169262 if err := <-done; err != nil {
170170- // Log error but don't fail - we still want to wait for the command
171263 fmt.Fprintf(os.Stderr, "WARNING: error streaming output: %v\n", err)
172264 }
173265 }
···183275 }
184276 }
185277186186- // Emit step end event with exit code for error reporting
187187- emitControlEventWithCode(stepID, simpleStep, models.StepStatusEnd, exitCode)
278278+ // Emit step end — gRPC + stdout
279279+ emitter.sendStepControl(stepID, "end", exitCode)
280280+ emitStdoutControlWithCode(stepID, &simpleStep{name: step.Name, command: step.Command}, models.StepStatusEnd, exitCode)
188281189282 if exitCode != 0 {
190283 return fmt.Errorf("command exited with code %d", exitCode)
···193286 return nil
194287}
195288196196-func streamOutput(reader io.Reader, stepID int, stream string, done chan<- error) {
289289+func streamOutput(reader io.Reader, stepID int, streamName string, emitter *grpcEmitter, done chan<- error) {
197290 scanner := bufio.NewScanner(reader)
198198- // Increase buffer size for long lines
199291 buf := make([]byte, 0, 64*1024)
200292 scanner.Buffer(buf, 1024*1024)
201293202294 for scanner.Scan() {
203295 line := scanner.Text()
204204- emitDataEvent(stepID, stream, line)
296296+ // Send over gRPC (primary channel)
297297+ emitter.sendLogLine(stepID, streamName, line)
298298+ // Also emit to stdout for kubectl logs
299299+ emitStdoutData(stepID, streamName, line)
205300 }
206301207302 done <- scanner.Err()
208303}
209304210210-func emitControlEvent(stepID int, step models.Step, status models.StepStatus) {
305305+// Stdout emitters — for kubectl logs debugging. Not consumed by the operator.
306306+307307+func emitStdoutControl(stepID int, step models.Step, status models.StepStatus) {
211308 logLine := models.NewControlLogLine(stepID, step, status)
212212- emitJSON(logLine)
309309+ emitStdoutJSON(logLine)
213310}
214311215215-// emitControlEventWithCode emits a control event with an exit code for error reporting
216216-func emitControlEventWithCode(stepID int, step models.Step, status models.StepStatus, exitCode int) {
312312+func emitStdoutControlWithCode(stepID int, step models.Step, status models.StepStatus, exitCode int) {
217313 logLine := models.NewControlLogLine(stepID, step, status)
218218- extended := extendedLogLine{
219219- LogLine: logLine,
220220- ExitCode: exitCode,
314314+ type extended struct {
315315+ models.LogLine
316316+ ExitCode int `json:"exit_code,omitempty"`
221317 }
222222- data, err := json.Marshal(extended)
318318+ data, err := json.Marshal(extended{LogLine: logLine, ExitCode: exitCode})
223319 if err != nil {
224224- fmt.Fprintf(os.Stderr, "ERROR: failed to marshal JSON: %v\n", err)
225320 return
226321 }
227322 fmt.Println(string(data))
228323}
229324230230-func emitDataEvent(stepID int, stream, content string) {
325325+func emitStdoutData(stepID int, stream, content string) {
231326 logLine := models.NewDataLogLine(stepID, content, stream)
232232- emitJSON(logLine)
327327+ emitStdoutJSON(logLine)
233328}
234329235235-func emitJSON(logLine models.LogLine) {
330330+func emitStdoutJSON(logLine models.LogLine) {
236331 data, err := json.Marshal(logLine)
237332 if err != nil {
238238- fmt.Fprintf(os.Stderr, "ERROR: failed to marshal JSON: %v\n", err)
239333 return
240334 }
241335 fmt.Println(string(data))
242336}
337337+338338+// uploadArtifacts walks the artifacts directory and streams all files to the operator.
339339+func uploadArtifacts(emitter *grpcEmitter, dir string) error {
340340+ info, err := os.Stat(dir)
341341+ if os.IsNotExist(err) {
342342+ return nil // No artifacts to upload
343343+ }
344344+ if err != nil {
345345+ return err
346346+ }
347347+ if !info.IsDir() {
348348+ return nil
349349+ }
350350+351351+ const chunkSize = 32 * 1024 // 32KB
352352+353353+ return filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
354354+ if err != nil || info.IsDir() {
355355+ return err
356356+ }
357357+358358+ relPath, err := filepath.Rel(dir, path)
359359+ if err != nil {
360360+ return err
361361+ }
362362+363363+ f, err := os.Open(path)
364364+ if err != nil {
365365+ return fmt.Errorf("failed to open artifact %s: %w", relPath, err)
366366+ }
367367+ defer f.Close()
368368+369369+ buf := make([]byte, chunkSize)
370370+ for {
371371+ n, readErr := f.Read(buf)
372372+ isEOF := readErr == io.EOF
373373+374374+ if emitter.stream != nil {
375375+ _ = emitter.stream.Send(&pb.ConnectRequest{
376376+ Event: &pb.ConnectRequest_ArtifactChunk{
377377+ ArtifactChunk: &pb.ArtifactChunk{
378378+ Path: relPath,
379379+ Data: buf[:n],
380380+ Eof: isEOF,
381381+ },
382382+ },
383383+ })
384384+ }
385385+386386+ if isEOF {
387387+ break
388388+ }
389389+ if readErr != nil {
390390+ return fmt.Errorf("failed to read artifact %s: %w", relPath, readErr)
391391+ }
392392+ }
393393+394394+ fmt.Fprintf(os.Stderr, "uploaded artifact: %s\n", relPath)
395395+ return nil
396396+ })
397397+}
398398+399399+// downloadArtifacts receives artifact files from the operator into the local artifacts directory.
400400+// Used by final jobs to receive artifacts from matrix legs.
401401+func downloadArtifacts(emitter *grpcEmitter, dir string) error {
402402+ if emitter.stream == nil {
403403+ return nil
404404+ }
405405+406406+ for {
407407+ resp, err := emitter.stream.Recv()
408408+ if err == io.EOF {
409409+ return nil
410410+ }
411411+ if err != nil {
412412+ return fmt.Errorf("failed to receive artifact: %w", err)
413413+ }
414414+415415+ ad, ok := resp.Event.(*pb.ConnectResponse_ArtifactData)
416416+ if !ok {
417417+ continue
418418+ }
419419+420420+ data := ad.ArtifactData
421421+422422+ // Sentinel: empty path with Eof=true signals "all artifacts sent".
423423+ if data.Path == "" && data.Eof {
424424+ return nil
425425+ }
426426+427427+ targetDir := filepath.Join(dir, data.SourceArchitecture)
428428+ targetPath := filepath.Join(targetDir, data.Path)
429429+430430+ if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil {
431431+ return fmt.Errorf("failed to create artifact directory: %w", err)
432432+ }
433433+434434+ f, err := os.OpenFile(targetPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
435435+ if err != nil {
436436+ return fmt.Errorf("failed to open artifact file: %w", err)
437437+ }
438438+439439+ if len(data.Data) > 0 {
440440+ if _, err := f.Write(data.Data); err != nil {
441441+ f.Close()
442442+ return fmt.Errorf("failed to write artifact: %w", err)
443443+ }
444444+ }
445445+ f.Close()
446446+447447+ if data.Eof {
448448+ fmt.Fprintf(os.Stderr, "downloaded artifact: %s/%s\n", data.SourceArchitecture, data.Path)
449449+ }
450450+ }
451451+}
+102-4
config/crd/bases/loom.j5t.io_spindlesets.yaml
···7777 items:
7878 type: string
7979 type: array
8080+ multiArch:
8181+ description: |-
8282+ MultiArch indicates this pipeline run contains multi-arch workflows.
8383+ When true, the controller creates per-architecture Jobs and gates the final Job.
8484+ type: boolean
8085 pipelineID:
8186 description: PipelineID is the unique identifier for this pipeline
8287 run from the knot.
···110115 container entirely.
111116 type: boolean
112117 workflows:
113113- description: Workflows is the list of workflows to execute in
114114- this pipeline.
118118+ description: |-
119119+ Workflows is the list of workflows to execute in this pipeline.
120120+ For multi-arch workflows, this contains one entry per matrix leg plus an optional final entry.
115121 items:
116122 description: |-
117123 WorkflowSpec defines a workflow to execute as part of a pipeline.
118124 This is the canonical workflow definition that matches the .tangled/workflows/*.yaml format.
125125+ For multi-arch workflows, the engine expands the matrix and creates one WorkflowSpec per leg.
126126+ Each leg has a single Image and Architecture; the matrix metadata lives in PipelineRunSpec.
119127 properties:
120128 architecture:
121129 description: Architecture is the target architecture for
···141149 description: Environment contains workflow-level environment
142150 variables.
143151 type: object
152152+ final:
153153+ description: |-
154154+ Final defines steps that run once after all matrix legs complete.
155155+ Only valid on multi-arch workflows. The engine sets this on the dedicated final WorkflowSpec.
156156+ properties:
157157+ architecture:
158158+ description: Architecture is the target architecture
159159+ for the final steps.
160160+ enum:
161161+ - amd64
162162+ - arm64
163163+ type: string
164164+ image:
165165+ description: |-
166166+ Image is the container image for the final steps.
167167+ If empty, uses the first image from the matrix.
168168+ type: string
169169+ steps:
170170+ description: Steps is the ordered list of steps to execute
171171+ after all matrix legs complete.
172172+ items:
173173+ description: WorkflowStep defines a single step in
174174+ a workflow.
175175+ properties:
176176+ command:
177177+ description: Command is the shell command to execute.
178178+ type: string
179179+ environment:
180180+ additionalProperties:
181181+ type: string
182182+ description: Environment contains step-specific
183183+ environment variables.
184184+ type: object
185185+ name:
186186+ description: Name is the human-readable name of
187187+ the step.
188188+ type: string
189189+ required:
190190+ - command
191191+ - name
192192+ type: object
193193+ minItems: 1
194194+ type: array
195195+ required:
196196+ - architecture
197197+ - steps
198198+ type: object
144199 image:
145200 description: Image is the container image to use for executing
146201 the workflow steps.
147202 type: string
203203+ isFinal:
204204+ description: IsFinal indicates this WorkflowSpec represents
205205+ the final step of a multi-arch workflow.
206206+ type: boolean
207207+ isMatrixLeg:
208208+ description: IsMatrixLeg indicates this WorkflowSpec was
209209+ generated from a matrix expansion.
210210+ type: boolean
148211 name:
149212 description: Name is the workflow filename (e.g., "workflow-amd64.yaml").
150213 type: string
···13601423 description: CompletionTime is when the workflow finished.
13611424 format: date-time
13621425 type: string
14261426+ finalJobName:
14271427+ description: FinalJobName is the name of the final Job (for
14281428+ multi-arch workflows).
14291429+ type: string
14301430+ finalPhase:
14311431+ description: FinalPhase is the phase of the final Job.
14321432+ type: string
13631433 jobName:
13641364- description: JobName is the name of the Kubernetes Job created
13651365- for this workflow.
14341434+ description: |-
14351435+ JobName is the name of the Kubernetes Job created for this workflow.
14361436+ For multi-arch workflows, this is empty; use MatrixLegStatuses instead.
13661437 type: string
14381438+ matrixLegStatuses:
14391439+ description: MatrixLegStatuses tracks per-architecture Job statuses
14401440+ for multi-arch workflows.
14411441+ items:
14421442+ description: MatrixLegStatus tracks the status of a single
14431443+ matrix leg Job.
14441444+ properties:
14451445+ architecture:
14461446+ description: Architecture is the target architecture for
14471447+ this leg.
14481448+ type: string
14491449+ image:
14501450+ description: Image is the container image used for this
14511451+ leg.
14521452+ type: string
14531453+ jobName:
14541454+ description: JobName is the name of the Kubernetes Job
14551455+ for this leg.
14561456+ type: string
14571457+ phase:
14581458+ description: Phase is the current phase (Pending, Running,
14591459+ Succeeded, Failed).
14601460+ type: string
14611461+ required:
14621462+ - architecture
14631463+ type: object
14641464+ type: array
13671465 name:
13681466 description: Name is the workflow name.
13691467 type: string
···22name: loom
33description: A Kubernetes operator that runs CI/CD pipelines from tangled.org
44type: application
55-version: 0.0.1
66-appVersion: "0.0.1"
55+version: 0.1.5
66+appVersion: "0.1.5"
77home: https://github.com/tangled-sh/loom
88sources:
99 - https://github.com/tangled-sh/loom
+104-5
helm/loom/crds/loom.j5t.io_spindlesets.yaml
···7777 items:
7878 type: string
7979 type: array
8080+ multiArch:
8181+ description: |-
8282+ MultiArch indicates this pipeline run contains multi-arch workflows.
8383+ When true, the controller creates per-architecture Jobs and gates the final Job.
8484+ type: boolean
8085 pipelineID:
8186 description: PipelineID is the unique identifier for this pipeline
8287 run from the knot.
···110115 container entirely.
111116 type: boolean
112117 workflows:
113113- description: Workflows is the list of workflows to execute in
114114- this pipeline.
118118+ description: |-
119119+ Workflows is the list of workflows to execute in this pipeline.
120120+ For multi-arch workflows, this contains one entry per matrix leg plus an optional final entry.
115121 items:
116122 description: |-
117123 WorkflowSpec defines a workflow to execute as part of a pipeline.
118124 This is the canonical workflow definition that matches the .tangled/workflows/*.yaml format.
125125+ For multi-arch workflows, the engine expands the matrix and creates one WorkflowSpec per leg.
126126+ Each leg has a single Image and Architecture; the matrix metadata lives in PipelineRunSpec.
119127 properties:
120128 architecture:
121129 description: Architecture is the target architecture for
···141149 description: Environment contains workflow-level environment
142150 variables.
143151 type: object
152152+ final:
153153+ description: |-
154154+ Final defines steps that run once after all matrix legs complete.
155155+ Only valid on multi-arch workflows. The engine sets this on the dedicated final WorkflowSpec.
156156+ properties:
157157+ architecture:
158158+ description: Architecture is the target architecture
159159+ for the final steps.
160160+ enum:
161161+ - amd64
162162+ - arm64
163163+ type: string
164164+ image:
165165+ description: |-
166166+ Image is the container image for the final steps.
167167+ If empty, uses the first image from the matrix.
168168+ type: string
169169+ steps:
170170+ description: Steps is the ordered list of steps to execute
171171+ after all matrix legs complete.
172172+ items:
173173+ description: WorkflowStep defines a single step in
174174+ a workflow.
175175+ properties:
176176+ command:
177177+ description: Command is the shell command to execute.
178178+ type: string
179179+ environment:
180180+ additionalProperties:
181181+ type: string
182182+ description: Environment contains step-specific
183183+ environment variables.
184184+ type: object
185185+ name:
186186+ description: Name is the human-readable name of
187187+ the step.
188188+ type: string
189189+ required:
190190+ - command
191191+ - name
192192+ type: object
193193+ minItems: 1
194194+ type: array
195195+ required:
196196+ - architecture
197197+ - steps
198198+ type: object
144199 image:
145200 description: Image is the container image to use for executing
146201 the workflow steps.
147202 type: string
203203+ isFinal:
204204+ description: IsFinal indicates this WorkflowSpec represents
205205+ the final step of a multi-arch workflow.
206206+ type: boolean
207207+ isMatrixLeg:
208208+ description: IsMatrixLeg indicates this WorkflowSpec was
209209+ generated from a matrix expansion.
210210+ type: boolean
148211 name:
149212 description: Name is the workflow filename (e.g., "workflow-amd64.yaml").
150213 type: string
···12401303 operator:
12411304 description: |-
12421305 Operator represents a key's relationship to the value.
12431243- Valid operators are Exists and Equal. Defaults to Equal.
13061306+ Valid operators are Exists, Equal, Lt, and Gt. Defaults to Equal.
12441307 Exists is equivalent to wildcard for value, so that a pod can
12451308 tolerate all taints of a particular category.
13091309+ Lt and Gt perform numeric comparisons (requires feature gate TaintTolerationComparisonOperators).
12461310 type: string
12471311 tolerationSeconds:
12481312 description: |-
···13591423 description: CompletionTime is when the workflow finished.
13601424 format: date-time
13611425 type: string
14261426+ finalJobName:
14271427+ description: FinalJobName is the name of the final Job (for
14281428+ multi-arch workflows).
14291429+ type: string
14301430+ finalPhase:
14311431+ description: FinalPhase is the phase of the final Job.
14321432+ type: string
13621433 jobName:
13631363- description: JobName is the name of the Kubernetes Job created
13641364- for this workflow.
14341434+ description: |-
14351435+ JobName is the name of the Kubernetes Job created for this workflow.
14361436+ For multi-arch workflows, this is empty; use MatrixLegStatuses instead.
13651437 type: string
14381438+ matrixLegStatuses:
14391439+ description: MatrixLegStatuses tracks per-architecture Job statuses
14401440+ for multi-arch workflows.
14411441+ items:
14421442+ description: MatrixLegStatus tracks the status of a single
14431443+ matrix leg Job.
14441444+ properties:
14451445+ architecture:
14461446+ description: Architecture is the target architecture for
14471447+ this leg.
14481448+ type: string
14491449+ image:
14501450+ description: Image is the container image used for this
14511451+ leg.
14521452+ type: string
14531453+ jobName:
14541454+ description: JobName is the name of the Kubernetes Job
14551455+ for this leg.
14561456+ type: string
14571457+ phase:
14581458+ description: Phase is the current phase (Pending, Running,
14591459+ Succeeded, Failed).
14601460+ type: string
14611461+ required:
14621462+ - architecture
14631463+ type: object
14641464+ type: array
13661465 name:
13671466 description: Name is the workflow name.
13681467 type: string
···2233# Image configuration
44image:
55- repository: atcr.io/evan.jarrett.net/loom
55+ repository: buoy.cr/evan.jarrett.net/loom
66 pullPolicy: Always
77 # Overrides the image tag whose default is the chart appVersion.
88 tag: ""
···104104service:
105105 type: ClusterIP
106106 port: 6555
107107+108108+# gRPC service for runner communication
109109+grpc:
110110+ port: 9090
107111108112# RBAC configuration
109113rbac:
+190-69
internal/controller/spindleset_controller.go
···2424 "time"
25252626 "github.com/cenkalti/backoff/v4"
2727+ "github.com/go-logr/logr"
2728 "tangled.org/core/spindle"
2829 "tangled.org/core/spindle/models"
2930···3334 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3435 "k8s.io/apimachinery/pkg/runtime"
3536 "k8s.io/client-go/rest"
3737+ "k8s.io/client-go/util/retry"
3638 ctrl "sigs.k8s.io/controller-runtime"
3739 "sigs.k8s.io/controller-runtime/pkg/client"
3840 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
···5759 // LoomImage is the loom image containing the runner binary
5860 // Set from LOOM_IMAGE environment variable
5961 LoomImage string
6262+6363+ // OperatorAddr is the gRPC address of the operator for runner communication
6464+ OperatorAddr string
60656166 // Track watched Jobs for status reporting
6267 watchedJobs sync.Map // map[string]models.WorkflowId
···175180 logger.Error(err, "Failed to monitor job statuses")
176181 }
177182183183+ // For multi-arch pipelines, create final Jobs once all matrix legs succeed
184184+ if err := r.ensureFinalJobs(ctx, spindleSet); err != nil {
185185+ logger.Error(err, "Failed to ensure final Jobs")
186186+ }
187187+178188 if jobsErr != nil {
179189 return ctrl.Result{}, jobsErr
180190 }
···264274 return backoff.Retry(operation, backoff.WithContext(bo, ctx))
265275}
266276267267-// updateStatus updates the SpindleSet status based on current Jobs
277277+// updateStatus updates the SpindleSet status based on current Jobs.
278278+// Status updates retry on conflict to handle races with concurrent reconciles
279279+// (e.g., triggered by Job creation/updates via Owns).
268280func (r *SpindleSetReconciler) updateStatus(ctx context.Context, spindleSet *loomv1alpha1.SpindleSet) error {
269281 logger := log.FromContext(ctx)
270282271271- // Re-fetch the SpindleSet to get the latest version before updating status
272272- // This avoids optimistic concurrency conflicts when the object was modified
273273- // by another reconciliation loop (e.g., triggered by Job creation/updates)
274274- latestSpindleSet := &loomv1alpha1.SpindleSet{}
275275- if err := r.Get(ctx, client.ObjectKeyFromObject(spindleSet), latestSpindleSet); err != nil {
276276- return fmt.Errorf("failed to fetch latest SpindleSet: %w", err)
277277- }
278278- // Use the latest version for all subsequent operations
279279- spindleSet = latestSpindleSet
283283+ return retry.RetryOnConflict(retry.DefaultRetry, func() error {
284284+ // Re-fetch the SpindleSet on each attempt to get the latest version
285285+ latestSpindleSet := &loomv1alpha1.SpindleSet{}
286286+ if err := r.Get(ctx, client.ObjectKeyFromObject(spindleSet), latestSpindleSet); err != nil {
287287+ return fmt.Errorf("failed to fetch latest SpindleSet: %w", err)
288288+ }
289289+ return r.computeAndApplyStatus(ctx, logger, latestSpindleSet)
290290+ })
291291+}
280292293293+// computeAndApplyStatus recomputes status from Jobs and applies it to spindleSet.
294294+func (r *SpindleSetReconciler) computeAndApplyStatus(ctx context.Context, logger logr.Logger, spindleSet *loomv1alpha1.SpindleSet) error {
281295 // List all Jobs owned by this SpindleSet
282296 jobList := &batchv1.JobList{}
283297 if err := r.List(ctx, jobList, client.InNamespace(spindleSet.Namespace), client.MatchingLabels{
···474488 return fmt.Errorf("failed to list nodes: %w", err)
475489 }
476490477477- // Convert workflow steps to jobbuilder format and create Jobs for each workflow
491491+ // Convert workflow steps to jobbuilder format and create Jobs for each workflow.
492492+ // For multi-arch pipelines, final Jobs are gated on all matrix leg Jobs succeeding.
478493 for _, workflowSpec := range pipelineRun.Workflows {
479479- // Check if Job already exists
480480- jobName := fmt.Sprintf("spindle-%s-%s", pipelineRun.PipelineID, workflowSpec.Name)
481481- if len(jobName) > 63 {
482482- jobName = jobName[:63]
494494+ // For multi-arch: skip final workflows initially — they are created
495495+ // by ensureFinalJobs once all matrix leg Jobs have succeeded.
496496+ if workflowSpec.IsFinal {
497497+ continue
483498 }
484499485485- existingJob := &batchv1.Job{}
486486- err := r.Get(ctx, client.ObjectKey{
487487- Name: jobName,
488488- Namespace: spindleSet.Namespace,
489489- }, existingJob)
500500+ if err := r.createWorkflowJob(ctx, spindleSet, pipelineRun, workflowSpec, secretName, secretKeys, &nodeList); err != nil {
501501+ return err
502502+ }
503503+ }
504504+505505+ return nil
506506+}
507507+508508+// createWorkflowJob creates a single Kubernetes Job for a workflow spec.
509509+func (r *SpindleSetReconciler) createWorkflowJob(ctx context.Context, spindleSet *loomv1alpha1.SpindleSet, pipelineRun *loomv1alpha1.PipelineRunSpec, workflowSpec loomv1alpha1.WorkflowSpec, secretName string, secretKeys []string, nodeList *corev1.NodeList) error {
510510+ logger := log.FromContext(ctx)
490511491491- if err == nil {
492492- // Job already exists
493493- logger.V(1).Info("Job already exists for workflow", "workflow", workflowSpec.Name, "job", jobName)
494494- continue
512512+ // Check if Job already exists
513513+ jobName := fmt.Sprintf("spindle-%s-%s", pipelineRun.PipelineID, workflowSpec.Name)
514514+ if len(jobName) > 63 {
515515+ jobName = jobName[:63]
516516+ }
517517+518518+ existingJob := &batchv1.Job{}
519519+ err := r.Get(ctx, client.ObjectKey{
520520+ Name: jobName,
521521+ Namespace: spindleSet.Namespace,
522522+ }, existingJob)
523523+524524+ if err == nil {
525525+ logger.V(1).Info("Job already exists for workflow", "workflow", workflowSpec.Name, "job", jobName)
526526+ return nil
527527+ }
528528+529529+ if !apierrors.IsNotFound(err) {
530530+ return fmt.Errorf("failed to check for existing job: %w", err)
531531+ }
532532+533533+ // Convert workflow steps to jobbuilder format
534534+ jobSteps := make([]jobbuilder.WorkflowStep, 0, len(workflowSpec.Steps))
535535+ for _, step := range workflowSpec.Steps {
536536+ jobSteps = append(jobSteps, jobbuilder.WorkflowStep{
537537+ Name: step.Name,
538538+ Command: step.Command,
539539+ Env: step.Environment,
540540+ })
541541+ }
542542+543543+ // Build Job configuration
544544+ jobConfig := jobbuilder.WorkflowConfig{
545545+ WorkflowName: workflowSpec.Name,
546546+ PipelineID: pipelineRun.PipelineID,
547547+ SpindleSetName: spindleSet.Name,
548548+ Image: workflowSpec.Image,
549549+ LoomImage: r.LoomImage,
550550+ Architecture: workflowSpec.Architecture,
551551+ Steps: jobSteps,
552552+ WorkflowSpec: workflowSpec,
553553+ CloneCommands: pipelineRun.CloneCommands,
554554+ SkipClone: pipelineRun.SkipClone,
555555+ SecretName: secretName,
556556+ SecretKeys: secretKeys,
557557+ Template: spindleSet.Spec.Template,
558558+ Namespace: spindleSet.Namespace,
559559+ OperatorAddr: r.OperatorAddr,
560560+ }
561561+562562+ // Create the Job
563563+ job, err := jobbuilder.BuildJob(jobConfig, nodeList)
564564+ if err != nil {
565565+ return fmt.Errorf("failed to build job for workflow %s: %w", workflowSpec.Name, err)
566566+ }
567567+568568+ // Set SpindleSet as owner of the Job
569569+ if err := controllerutil.SetControllerReference(spindleSet, job, r.Scheme); err != nil {
570570+ return fmt.Errorf("failed to set controller reference: %w", err)
571571+ }
572572+573573+ logger.Info("Creating Job for workflow", "workflow", workflowSpec.Name, "job", job.Name)
574574+ if err := r.retryCreate(ctx, job); err != nil {
575575+ if apierrors.IsAlreadyExists(err) {
576576+ logger.Info("Job already exists, skipping creation", "workflow", workflowSpec.Name, "job", job.Name)
577577+ return nil
495578 }
579579+ return fmt.Errorf("failed to create job for workflow %s: %w", workflowSpec.Name, err)
580580+ }
581581+582582+ logger.Info("Job created successfully", "workflow", workflowSpec.Name, "job", job.Name)
583583+ return nil
584584+}
585585+586586+// ensureFinalJobs creates final Jobs for multi-arch pipelines once all matrix legs have succeeded.
587587+// This is called during reconciliation after job status monitoring.
588588+func (r *SpindleSetReconciler) ensureFinalJobs(ctx context.Context, spindleSet *loomv1alpha1.SpindleSet) error {
589589+ logger := log.FromContext(ctx)
590590+ pipelineRun := spindleSet.Spec.PipelineRun
591591+592592+ if !pipelineRun.MultiArch {
593593+ return nil
594594+ }
496595497497- if !apierrors.IsNotFound(err) {
498498- return fmt.Errorf("failed to check for existing job: %w", err)
596596+ // Find the final workflow spec
597597+ var finalSpec *loomv1alpha1.WorkflowSpec
598598+ for i := range pipelineRun.Workflows {
599599+ if pipelineRun.Workflows[i].IsFinal {
600600+ finalSpec = &pipelineRun.Workflows[i]
601601+ break
499602 }
603603+ }
604604+ if finalSpec == nil {
605605+ return nil // No final step defined
606606+ }
500607501501- // Convert workflow steps to jobbuilder format
502502- jobSteps := make([]jobbuilder.WorkflowStep, 0, len(workflowSpec.Steps))
503503- for _, step := range workflowSpec.Steps {
504504- jobSteps = append(jobSteps, jobbuilder.WorkflowStep{
505505- Name: step.Name,
506506- Command: step.Command,
507507- Env: step.Environment,
508508- })
608608+ // Check if final Job already exists
609609+ finalJobName := fmt.Sprintf("spindle-%s-%s", pipelineRun.PipelineID, finalSpec.Name)
610610+ if len(finalJobName) > 63 {
611611+ finalJobName = finalJobName[:63]
612612+ }
613613+ existingJob := &batchv1.Job{}
614614+ if err := r.Get(ctx, client.ObjectKey{Name: finalJobName, Namespace: spindleSet.Namespace}, existingJob); err == nil {
615615+ return nil // Already created
616616+ }
617617+618618+ // Check if all matrix leg Jobs have succeeded
619619+ jobList := &batchv1.JobList{}
620620+ if err := r.List(ctx, jobList,
621621+ client.InNamespace(spindleSet.Namespace),
622622+ client.MatchingLabels{"loom.j5t.io/spindleset": spindleSet.Name},
623623+ ); err != nil {
624624+ return fmt.Errorf("failed to list jobs: %w", err)
625625+ }
626626+627627+ legCount := 0
628628+ succeededCount := 0
629629+ for _, job := range jobList.Items {
630630+ // Count only matrix leg jobs (not the final job)
631631+ wfName := job.Labels["loom.j5t.io/workflow"]
632632+ for _, wf := range pipelineRun.Workflows {
633633+ if wf.Name == wfName && wf.IsMatrixLeg {
634634+ legCount++
635635+ if job.Status.Succeeded > 0 {
636636+ succeededCount++
637637+ }
638638+ break
639639+ }
509640 }
641641+ }
510642511511- // Build Job configuration
512512- jobConfig := jobbuilder.WorkflowConfig{
513513- WorkflowName: workflowSpec.Name,
514514- PipelineID: pipelineRun.PipelineID,
515515- SpindleSetName: spindleSet.Name,
516516- Image: workflowSpec.Image,
517517- LoomImage: r.LoomImage,
518518- Architecture: workflowSpec.Architecture,
519519- Steps: jobSteps,
520520- WorkflowSpec: workflowSpec, // Pass full workflow spec to runner
521521- CloneCommands: pipelineRun.CloneCommands,
522522- SkipClone: pipelineRun.SkipClone,
523523- SecretName: secretName, // Name of K8s Secret to inject (empty if no secrets)
524524- SecretKeys: secretKeys, // Secret env var names for log masking
525525- Template: spindleSet.Spec.Template,
526526- Namespace: spindleSet.Namespace,
643643+ // Count expected legs
644644+ expectedLegs := 0
645645+ for _, wf := range pipelineRun.Workflows {
646646+ if wf.IsMatrixLeg {
647647+ expectedLegs++
527648 }
649649+ }
528650529529- // Create the Job
530530- job, err := jobbuilder.BuildJob(jobConfig, &nodeList)
531531- if err != nil {
532532- return fmt.Errorf("failed to build job for workflow %s: %w", workflowSpec.Name, err)
533533- }
651651+ if legCount < expectedLegs || succeededCount < expectedLegs {
652652+ logger.V(1).Info("Waiting for matrix legs to complete",
653653+ "expected", expectedLegs, "found", legCount, "succeeded", succeededCount)
654654+ return nil // Not all legs have completed yet
655655+ }
534656535535- // Set SpindleSet as owner of the Job
536536- if err := controllerutil.SetControllerReference(spindleSet, job, r.Scheme); err != nil {
537537- return fmt.Errorf("failed to set controller reference: %w", err)
538538- }
657657+ logger.Info("All matrix legs succeeded, creating final Job", "legs", expectedLegs)
539658540540- logger.Info("Creating Job for workflow", "workflow", workflowSpec.Name, "job", job.Name)
541541- if err := r.retryCreate(ctx, job); err != nil {
542542- if apierrors.IsAlreadyExists(err) {
543543- // Job already exists (possibly from previous deployment), skip
544544- logger.Info("Job already exists, skipping creation", "workflow", workflowSpec.Name, "job", job.Name)
545545- continue
546546- }
547547- return fmt.Errorf("failed to create job for workflow %s: %w", workflowSpec.Name, err)
659659+ // Build secret info
660660+ secretName := ""
661661+ var secretKeys []string
662662+ if len(pipelineRun.Secrets) > 0 {
663663+ secretName = fmt.Sprintf("%s-secrets", spindleSet.Name)
664664+ for _, s := range pipelineRun.Secrets {
665665+ secretKeys = append(secretKeys, s.Key)
548666 }
667667+ }
549668550550- logger.Info("Job created successfully", "workflow", workflowSpec.Name, "job", job.Name)
669669+ var nodeList corev1.NodeList
670670+ if err := r.List(ctx, &nodeList); err != nil {
671671+ return fmt.Errorf("failed to list nodes: %w", err)
551672 }
552673553553- return nil
674674+ return r.createWorkflowJob(ctx, spindleSet, pipelineRun, *finalSpec, secretName, secretKeys, &nodeList)
554675}
555676556677// cleanupOrphanedJobs cleans up Jobs without a matching SpindleSet
+419-274
internal/engine/kubernetes_engine.go
···11package engine
2233import (
44- "bufio"
54 "context"
66- "encoding/json"
75 "fmt"
88- "io"
96 "maps"
107 "strings"
1111- "sync"
128 "time"
1391410 securejoin "github.com/cyphar/filepath-securejoin"
1511 "gopkg.in/yaml.v3"
1616- batchv1 "k8s.io/api/batch/v1"
1717- corev1 "k8s.io/api/core/v1"
1812 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1919- "k8s.io/client-go/kubernetes"
2013 "k8s.io/client-go/rest"
2114 "sigs.k8s.io/controller-runtime/pkg/client"
2215 "sigs.k8s.io/controller-runtime/pkg/log"
···2619 "tangled.org/core/spindle/secrets"
27202821 loomv1alpha1 "tangled.org/evan.jarrett.net/loom/api/v1alpha1"
2222+ loomgrpc "tangled.org/evan.jarrett.net/loom/internal/grpc"
2923)
30243131-// workflowLogStream holds the state for streaming logs from a workflow's pod
3232-type workflowLogStream struct {
3333- scanner *bufio.Scanner
3434- stream io.ReadCloser
3535- pod *corev1.Pod
3636- podPhase corev1.PodPhase // Track pod phase at stream creation time
2525+// syntheticStep is a minimal implementation of models.Step used to emit
2626+// ControlWriter entries for matrix-leg user steps (which are invisible to
2727+// the upstream spindle framework because loom wraps them in synthetic
2828+// "Matrix build" / "Final" framework steps).
2929+type syntheticStep struct {
3030+ name string
3131+ command string
3232+ kind models.StepKind
3733}
38343939-// extendedLogLine extends models.LogLine with exit code for error reporting
4040-type extendedLogLine struct {
4141- models.LogLine
4242- ExitCode int `json:"exit_code,omitempty"`
3535+func (s syntheticStep) Name() string { return s.name }
3636+func (s syntheticStep) Command() string { return s.command }
3737+func (s syntheticStep) Kind() models.StepKind { return s.kind }
3838+3939+// matrixLegLogStepID returns a collision-free log step id for a matrix leg's
4040+// user step. The offset pushes us past the two framework steps (0, 1) that
4141+// the upstream engine already emits ("Matrix build", "Final").
4242+func matrixLegLogStepID(legIdx, userStepIdx int) int {
4343+ return 1000 + legIdx*100 + userStepIdx
4444+}
4545+4646+// finalLogStepID returns a collision-free log step id for a final-phase step,
4747+// placed after the matrix-leg id range.
4848+func finalLogStepID(numLegs, stepIdx int) int {
4949+ return 1000 + (numLegs+1)*100 + stepIdx
4350}
44514552// KubernetesEngine implements the spindle Engine interface for Kubernetes Jobs.
···4956 namespace string
5057 template loomv1alpha1.SpindleTemplate
5158 vault secrets.Manager
5959+ hub *loomgrpc.Hub
6060+ artifacts *loomgrpc.ArtifactStore
52615362 // Track created SpindleSets for cleanup
5463 spindleSets map[string]*loomv1alpha1.SpindleSet
5555-5656- // Active log streams per workflow - persist across RunStep calls
5757- logStreams map[string]*workflowLogStream
5858- streamMutex sync.RWMutex
5964}
60656166// NewKubernetesEngine creates a new Kubernetes-based spindle engine.
6262-func NewKubernetesEngine(k8sClient client.Client, config *rest.Config, namespace string, template loomv1alpha1.SpindleTemplate, vault secrets.Manager) *KubernetesEngine {
6767+func NewKubernetesEngine(k8sClient client.Client, config *rest.Config, namespace string, template loomv1alpha1.SpindleTemplate, vault secrets.Manager, hub *loomgrpc.Hub, artifacts *loomgrpc.ArtifactStore) *KubernetesEngine {
6368 return &KubernetesEngine{
6469 client: k8sClient,
6570 config: config,
6671 namespace: namespace,
6772 template: template,
6873 vault: vault,
7474+ hub: hub,
7575+ artifacts: artifacts,
6976 spindleSets: make(map[string]*loomv1alpha1.SpindleSet),
7070- logStreams: make(map[string]*workflowLogStream),
7177 }
7278}
73798080+// StringOrSlice is a YAML type that accepts either a single string or an array of strings.
8181+type StringOrSlice []string
8282+8383+func (s *StringOrSlice) UnmarshalYAML(node *yaml.Node) error {
8484+ if node.Kind == yaml.ScalarNode {
8585+ *s = []string{node.Value}
8686+ return nil
8787+ }
8888+ var slice []string
8989+ if err := node.Decode(&slice); err != nil {
9090+ return err
9191+ }
9292+ *s = slice
9393+ return nil
9494+}
9595+9696+// rawWorkflowSpec is used for initial YAML parsing before matrix expansion.
9797+// It handles the polymorphic image/architecture fields.
9898+type rawWorkflowSpec struct {
9999+ Image StringOrSlice `yaml:"image"`
100100+ Architecture StringOrSlice `yaml:"architecture"`
101101+ Steps []loomv1alpha1.WorkflowStep `yaml:"steps"`
102102+ When []loomv1alpha1.WorkflowWhen `yaml:"when"`
103103+ Environment map[string]string `yaml:"environment"`
104104+ Dependencies *loomv1alpha1.WorkflowDependencies `yaml:"dependencies"`
105105+ Final *loomv1alpha1.FinalSpec `yaml:"final"`
106106+}
107107+108108+// MatrixLeg represents a single combination of image and architecture.
109109+type MatrixLeg struct {
110110+ Image string
111111+ Architecture string
112112+}
113113+114114+// expandMatrix computes the cartesian product of images and architectures.
115115+func expandMatrix(images, architectures StringOrSlice) []MatrixLeg {
116116+ var legs []MatrixLeg
117117+ for _, img := range images {
118118+ for _, arch := range architectures {
119119+ legs = append(legs, MatrixLeg{Image: img, Architecture: arch})
120120+ }
121121+ }
122122+ return legs
123123+}
124124+74125// kubernetesWorkflowData holds pre-computed data for workflow execution.
75126// Built in InitWorkflow, consumed in SetupWorkflow.
76127type kubernetesWorkflowData struct {
77128 Spec loomv1alpha1.WorkflowSpec
78129 CloneStep models.CloneStep // empty if clone should be skipped
130130+131131+ // Multi-arch fields
132132+ IsMultiArch bool
133133+ MatrixLegs []MatrixLeg
134134+ FinalSpec *loomv1alpha1.FinalSpec
79135}
8013681137// SimpleStep implements the models.Step interface.
···100156// InitWorkflow parses the workflow YAML and initializes a Workflow model.
101157// Pipeline environment variables (TANGLED_*) are injected into workflow.Environment
102158// by the framework after this method returns.
159159+//
160160+// For multi-arch workflows (image or architecture is an array), the matrix is expanded
161161+// and stored in kubernetesWorkflowData. The framework sees synthetic steps:
162162+// - Step 0: "Matrix build" (engine fans out to N parallel Jobs, waits for all)
163163+// - Step 1: "Final" (engine creates final Job, waits for completion) — only if final block exists
103164func (e *KubernetesEngine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
104104- // Parse the Raw YAML into the unified WorkflowSpec type
105105- var spec loomv1alpha1.WorkflowSpec
106106- if err := yaml.Unmarshal([]byte(twf.Raw), &spec); err != nil {
165165+ // Parse YAML with polymorphic image/architecture handling
166166+ var raw rawWorkflowSpec
167167+ if err := yaml.Unmarshal([]byte(twf.Raw), &raw); err != nil {
107168 return nil, fmt.Errorf("failed to parse workflow YAML: %w", err)
108169 }
109170110110- // Set the workflow name from the tangled workflow
111111- spec.Name = twf.Name
171171+ if len(raw.Image) == 0 {
172172+ return nil, fmt.Errorf("workflow must specify an 'image' field")
173173+ }
174174+ if len(raw.Architecture) == 0 {
175175+ raw.Architecture = StringOrSlice{"amd64"}
176176+ }
177177+178178+ // Build clone step
179179+ var cloneStep models.CloneStep
180180+ if twf.Clone == nil || !twf.Clone.Skip {
181181+ cloneStep = models.BuildCloneStep(twf, *tpl.TriggerMetadata, false)
182182+ }
183183+184184+ // Determine if this is a multi-arch workflow
185185+ legs := expandMatrix(raw.Image, raw.Architecture)
186186+ isMultiArch := len(legs) > 1 || raw.Final != nil
112187113113- // Validate required fields
114114- if spec.Image == "" {
115115- return nil, fmt.Errorf("workflow must specify an 'image' field")
188188+ if isMultiArch {
189189+ return e.initMultiArchWorkflow(twf.Name, raw, legs, cloneStep)
116190 }
117191118118- // Default architecture to amd64 if not specified
119119- if spec.Architecture == "" {
120120- spec.Architecture = "amd64"
192192+ // Single-arch workflow: existing behavior
193193+ spec := loomv1alpha1.WorkflowSpec{
194194+ Name: twf.Name,
195195+ Image: raw.Image[0],
196196+ Architecture: raw.Architecture[0],
197197+ Steps: raw.Steps,
198198+ When: raw.When,
199199+ Environment: raw.Environment,
200200+ Dependencies: raw.Dependencies,
121201 }
122202123123- // Convert steps to models.Step interface
124203 modelSteps := make([]models.Step, 0, len(spec.Steps))
125204 for _, stepSpec := range spec.Steps {
126205 modelSteps = append(modelSteps, SimpleStep{
···130209 })
131210 }
132211133133- // Build clone step (uses upstream models.BuildCloneStep which is self-contained)
134134- var cloneStep models.CloneStep
135135- devMode := false // TODO: Make this configurable
136136-137137- if twf.Clone == nil || !twf.Clone.Skip {
138138- cloneStep = models.BuildCloneStep(twf, *tpl.TriggerMetadata, devMode)
139139- }
140140-141141- // Store pre-computed workflow data
142212 workflowData := &kubernetesWorkflowData{
143213 Spec: spec,
144214 CloneStep: cloneStep,
145215 }
146216147147- // Set engine-specific environment variables on the workflow
148148- // These will be merged with pipeline env vars by the framework
149217 workflowEnv := map[string]string{
150218 "TANGLED_ARCHITECTURE": spec.Architecture,
151151- // HOME must be writable; we run as user 10000 so default /root won't work
152152- "HOME": "/tmp",
219219+ "HOME": "/tmp",
153220 }
154221155155- workflow := &models.Workflow{
222222+ return &models.Workflow{
156223 Steps: modelSteps,
157224 Name: twf.Name,
158225 Data: workflowData,
159226 Environment: workflowEnv,
227227+ }, nil
228228+}
229229+230230+// initMultiArchWorkflow creates a Workflow with synthetic steps for matrix execution.
231231+func (e *KubernetesEngine) initMultiArchWorkflow(name string, raw rawWorkflowSpec, legs []MatrixLeg, cloneStep models.CloneStep) (*models.Workflow, error) {
232232+ // Use the first leg's architecture for the spec that gets stored
233233+ // (the actual per-leg specs are built in SetupWorkflow)
234234+ spec := loomv1alpha1.WorkflowSpec{
235235+ Name: name,
236236+ Image: raw.Image[0],
237237+ Architecture: raw.Architecture[0],
238238+ Steps: raw.Steps,
239239+ When: raw.When,
240240+ Environment: raw.Environment,
241241+ Dependencies: raw.Dependencies,
242242+ Final: raw.Final,
160243 }
161244162162- return workflow, nil
245245+ // Build synthetic steps that the framework will iterate over
246246+ var modelSteps []models.Step
247247+248248+ // Step 0: matrix build phase (fans out to N parallel Jobs)
249249+ archList := make([]string, len(legs))
250250+ for i, leg := range legs {
251251+ archList[i] = leg.Architecture
252252+ }
253253+ modelSteps = append(modelSteps, SimpleStep{
254254+ StepName: fmt.Sprintf("Matrix build (%s)", strings.Join(archList, ", ")),
255255+ StepCommand: "# internal: matrix fan-out",
256256+ StepKind: models.StepKindUser,
257257+ })
258258+259259+ // Step 1: final phase (if final block exists)
260260+ if raw.Final != nil {
261261+ modelSteps = append(modelSteps, SimpleStep{
262262+ StepName: "Final",
263263+ StepCommand: "# internal: final step",
264264+ StepKind: models.StepKindUser,
265265+ })
266266+ }
267267+268268+ workflowData := &kubernetesWorkflowData{
269269+ Spec: spec,
270270+ CloneStep: cloneStep,
271271+ IsMultiArch: true,
272272+ MatrixLegs: legs,
273273+ FinalSpec: raw.Final,
274274+ }
275275+276276+ workflowEnv := map[string]string{
277277+ "TANGLED_ARCHITECTURE": raw.Architecture[0],
278278+ "HOME": "/tmp",
279279+ }
280280+281281+ return &models.Workflow{
282282+ Steps: modelSteps,
283283+ Name: name,
284284+ Data: workflowData,
285285+ Environment: workflowEnv,
286286+ }, nil
163287}
164288165289// SetupWorkflow creates a SpindleSet CR for the workflow.
···223347 }
224348225349 // Build PipelineRunSpec from pre-computed data
226226- // Knot is extracted from the pipeline ID provided by the framework
227350 skipClone := len(data.CloneStep.Commands()) == 0
351351+ var workflows []loomv1alpha1.WorkflowSpec
352352+353353+ if data.IsMultiArch {
354354+ // Expand matrix into per-leg WorkflowSpecs
355355+ for _, leg := range data.MatrixLegs {
356356+ legSpec := loomv1alpha1.WorkflowSpec{
357357+ Name: fmt.Sprintf("%s-%s", data.Spec.Name, leg.Architecture),
358358+ Image: leg.Image,
359359+ Architecture: leg.Architecture,
360360+ Steps: data.Spec.Steps,
361361+ When: data.Spec.When,
362362+ Environment: maps.Clone(data.Spec.Environment),
363363+ Dependencies: data.Spec.Dependencies,
364364+ IsMatrixLeg: true,
365365+ }
366366+ // Override per-leg env vars
367367+ if legSpec.Environment == nil {
368368+ legSpec.Environment = make(map[string]string)
369369+ }
370370+ legSpec.Environment["TANGLED_ARCHITECTURE"] = leg.Architecture
371371+ legSpec.Environment["TANGLED_IMAGE"] = leg.Image
372372+ legSpec.Environment["LOOM_MATRIX_LEG"] = "true"
373373+ legSpec.Environment["LOOM_ARTIFACTS"] = "/artifacts"
374374+ workflows = append(workflows, legSpec)
375375+ }
376376+377377+ // Add final WorkflowSpec if specified
378378+ if data.FinalSpec != nil {
379379+ finalImage := data.FinalSpec.Image
380380+ if finalImage == "" {
381381+ finalImage = data.MatrixLegs[0].Image
382382+ }
383383+ finalSpec := loomv1alpha1.WorkflowSpec{
384384+ Name: fmt.Sprintf("%s-final", data.Spec.Name),
385385+ Image: finalImage,
386386+ Architecture: data.FinalSpec.Architecture,
387387+ Steps: data.FinalSpec.Steps,
388388+ Environment: maps.Clone(data.Spec.Environment),
389389+ IsFinal: true,
390390+ }
391391+ if finalSpec.Environment == nil {
392392+ finalSpec.Environment = make(map[string]string)
393393+ }
394394+ finalSpec.Environment["TANGLED_ARCHITECTURE"] = data.FinalSpec.Architecture
395395+ finalSpec.Environment["LOOM_FINAL"] = "true"
396396+ finalSpec.Environment["LOOM_ARTIFACTS"] = "/artifacts"
397397+ workflows = append(workflows, finalSpec)
398398+ }
399399+400400+ logger.Info("Expanded multi-arch workflow", "legs", len(data.MatrixLegs), "hasFinal", data.FinalSpec != nil)
401401+ } else {
402402+ singleSpec := data.Spec
403403+ if singleSpec.Environment == nil {
404404+ singleSpec.Environment = make(map[string]string)
405405+ }
406406+ singleSpec.Environment["LOOM_ARTIFACTS"] = "/artifacts"
407407+ workflows = []loomv1alpha1.WorkflowSpec{singleSpec}
408408+ }
409409+228410 pipelineRun := &loomv1alpha1.PipelineRunSpec{
229411 PipelineID: wid.Rkey,
230412 SkipClone: skipClone,
231413 Secrets: repoSecrets,
232232- Workflows: []loomv1alpha1.WorkflowSpec{data.Spec},
414414+ Workflows: workflows,
415415+ MultiArch: data.IsMultiArch,
233416 }
234417235418 // Add clone commands if not skipping
···342525 // Remove from tracking map
343526 delete(e.spindleSets, wid.String())
344527345345- // Close any open log streams for this workflow
346346- e.closeLogStream(wid)
528528+ // Clean up artifacts for this pipeline
529529+ if e.artifacts != nil {
530530+ if err := e.artifacts.Cleanup(wid.PipelineId.AtUri().String()); err != nil {
531531+ logger.Error(err, "Failed to clean up artifacts")
532532+ }
533533+ }
347534348535 logger.Info("SpindleSet cleaned up successfully")
349536 return nil
350537}
351538352352-// RunStep streams logs for the specific step and waits for that step to complete.
353353-// For Kubernetes engine, all steps run in a single Job, but we stream logs incrementally
354354-// as each step executes. Each RunStep call blocks until that step's "end" control event is received.
539539+// RunStep waits for step completion events from the runner via the gRPC hub.
540540+// For single-arch workflows, blocks until the step's "end" control event.
541541+// For multi-arch workflows:
542542+// - idx 0: waits for ALL matrix leg runners to complete all their steps
543543+// - idx 1: waits for the final runner to complete all its steps
355544func (e *KubernetesEngine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, wfSecrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error {
356545 logger := log.FromContext(ctx).WithValues("workflow", wid.Name, "pipeline", wid.Rkey, "step", idx)
357546358358- // Query for the Job created by SpindleSetReconciler (only on first step)
359359- var job *batchv1.Job
360360- if idx == 0 {
361361- spindleSet, err := e.getSpindleSet(ctx, wid)
362362- if err != nil {
363363- return err
364364- }
365365- if spindleSet == nil {
366366- return fmt.Errorf("no SpindleSet found for workflow %s", wid.String())
367367- }
368368-369369- // Wait for Job to be created by controller
370370- deadline := time.Now().Add(5 * time.Minute)
371371- for {
372372- if time.Now().After(deadline) {
373373- return fmt.Errorf("timeout waiting for Job to be created by controller")
374374- }
375375-376376- jobList := &batchv1.JobList{}
377377- err := e.client.List(ctx, jobList,
378378- client.InNamespace(e.namespace),
379379- client.MatchingLabels{
380380- "loom.j5t.io/spindleset": spindleSet.Name,
381381- "loom.j5t.io/workflow": w.Name,
382382- "loom.j5t.io/pipeline-id": wid.Rkey,
383383- })
384384- if err != nil {
385385- return fmt.Errorf("failed to list jobs: %w", err)
386386- }
547547+ data, ok := w.Data.(*kubernetesWorkflowData)
548548+ if !ok {
549549+ return fmt.Errorf("invalid workflow data type")
550550+ }
387551388388- if len(jobList.Items) > 0 {
389389- job = &jobList.Items[0]
390390- break
391391- }
392392-393393- time.Sleep(2 * time.Second)
394394- }
395395-396396- logger.Info("Found Job for workflow", "jobName", job.Name)
552552+ if data.IsMultiArch {
553553+ return e.runMultiArchStep(ctx, wid, data, idx, wfLogger)
397554 }
398555399399- // Get or create log stream (creates on first step, reuses on subsequent steps)
400400- stream, err := e.getOrCreateLogStream(ctx, wid, job)
401401- if err != nil {
402402- return fmt.Errorf("failed to get log stream: %w", err)
556556+ // Single-arch: wait for one runner.
557557+ // PipelineID must match what the runner sends — the framework injects
558558+ // TANGLED_PIPELINE_ID as the full AT URI (see spindle/models.PipelineEnvVars),
559559+ // and the runner uses that value when registering with the hub.
560560+ key := loomgrpc.RunnerKey{
561561+ PipelineID: wid.PipelineId.AtUri().String(),
562562+ WorkflowName: wid.Name,
563563+ Architecture: data.Spec.Architecture,
403564 }
404565405405- // Read from stream until this step's end event
406406- if wfLogger != nil {
407407- logger.Info("Reading logs for step", "stepID", idx)
408408- if err := e.readUntilStepEnd(ctx, stream, idx, w, wfLogger); err != nil {
409409- logger.Error(err, "Failed to read step logs")
410410- // Clean up stream on error
411411- e.closeLogStream(wid)
412412- return fmt.Errorf("failed to read logs for step %d: %w", idx, err)
566566+ if idx == 0 {
567567+ logger.Info("waiting for runner to connect", "key", key.String())
568568+ select {
569569+ case <-e.hub.WaitForRunner(key):
570570+ logger.Info("runner connected", "key", key.String())
571571+ case <-ctx.Done():
572572+ return fmt.Errorf("context canceled while waiting for runner: %w", ctx.Err())
573573+ case <-time.After(10 * time.Minute):
574574+ return fmt.Errorf("timeout waiting for runner to connect")
413575 }
414414- logger.Info("Step completed", "stepID", idx)
415576 }
416577417417- // Clean up stream after last step
418418- if idx == len(w.Steps)-1 {
419419- logger.Info("Last step completed, closing log stream")
420420- e.closeLogStream(wid)
421421- }
578578+ return e.waitForRunnerStep(ctx, key, idx, idx, wfLogger)
579579+}
422580423423- return nil
424424-}
581581+// runMultiArchStep handles RunStep for multi-arch workflows.
582582+func (e *KubernetesEngine) runMultiArchStep(ctx context.Context, wid models.WorkflowId, data *kubernetesWorkflowData, idx int, wfLogger models.WorkflowLogger) error {
583583+ logger := log.FromContext(ctx).WithValues("workflow", wid.Name, "pipeline", wid.Rkey, "step", idx)
425584426426-// getOrCreateLogStream gets an existing log stream or creates a new one for step 0
427427-func (e *KubernetesEngine) getOrCreateLogStream(ctx context.Context, wid models.WorkflowId, job *batchv1.Job) (*workflowLogStream, error) {
428428- widKey := wid.String()
585585+ if idx == 0 {
586586+ // Matrix build phase: wait for all leg runners to complete all their steps
587587+ logger.Info("waiting for matrix leg runners", "legs", len(data.MatrixLegs))
429588430430- // Check if stream already exists
431431- e.streamMutex.RLock()
432432- stream, exists := e.logStreams[widKey]
433433- e.streamMutex.RUnlock()
589589+ type legResult struct {
590590+ leg MatrixLeg
591591+ err error
592592+ }
593593+ results := make(chan legResult, len(data.MatrixLegs))
434594435435- if exists {
436436- return stream, nil
437437- }
595595+ for legIdx, leg := range data.MatrixLegs {
596596+ go func() {
597597+ legName := fmt.Sprintf("%s-%s", data.Spec.Name, leg.Architecture)
598598+ key := loomgrpc.RunnerKey{
599599+ PipelineID: wid.PipelineId.AtUri().String(),
600600+ WorkflowName: legName,
601601+ Architecture: leg.Architecture,
602602+ }
438603439439- // Create new stream
440440- logger := log.FromContext(ctx).WithValues("workflow", wid.Name, "pipeline", wid.Rkey)
604604+ // Wait for this leg's runner to connect
605605+ select {
606606+ case <-e.hub.WaitForRunner(key):
607607+ logger.Info("matrix leg runner connected", "key", key.String())
608608+ case <-ctx.Done():
609609+ results <- legResult{leg: leg, err: ctx.Err()}
610610+ return
611611+ case <-time.After(10 * time.Minute):
612612+ results <- legResult{leg: leg, err: fmt.Errorf("timeout waiting for runner %s", key.String())}
613613+ return
614614+ }
441615442442- // Create kubernetes clientset for log streaming
443443- clientset, err := kubernetes.NewForConfig(e.config)
444444- if err != nil {
445445- return nil, fmt.Errorf("failed to create kubernetes clientset: %w", err)
446446- }
616616+ // Wait for all steps in this leg to complete, emitting per-leg
617617+ // control log lines so each architecture's output gets its own
618618+ // step section in the rendered log.
619619+ for stepIdx, userStep := range data.Spec.Steps {
620620+ logStepID := matrixLegLogStepID(legIdx, stepIdx)
621621+ sStep := syntheticStep{
622622+ name: fmt.Sprintf("%s (%s)", userStep.Name, leg.Architecture),
623623+ command: userStep.Command,
624624+ kind: models.StepKindUser,
625625+ }
626626+ if wfLogger != nil {
627627+ _, _ = wfLogger.ControlWriter(logStepID, sStep, models.StepStatusStart).Write([]byte{0})
628628+ }
629629+ err := e.waitForRunnerStep(ctx, key, stepIdx, logStepID, wfLogger)
630630+ if wfLogger != nil {
631631+ _, _ = wfLogger.ControlWriter(logStepID, sStep, models.StepStatusEnd).Write([]byte{0})
632632+ }
633633+ if err != nil {
634634+ results <- legResult{leg: leg, err: fmt.Errorf("leg %s step %q: %w", leg.Architecture, userStep.Name, err)}
635635+ return
636636+ }
637637+ }
447638448448- // Wait for pod to be created
449449- var pod *corev1.Pod
450450- deadline := time.Now().Add(2 * time.Minute)
451451- for {
452452- if time.Now().After(deadline) {
453453- return nil, fmt.Errorf("timeout waiting for pod to be created")
639639+ results <- legResult{leg: leg, err: nil}
640640+ }()
454641 }
455642456456- pods := &corev1.PodList{}
457457- err := e.client.List(ctx, pods, client.InNamespace(job.Namespace), client.MatchingLabels(job.Spec.Template.Labels))
458458- if err != nil {
459459- return nil, fmt.Errorf("failed to list pods: %w", err)
643643+ // Collect results from all legs
644644+ var errs []error
645645+ for range data.MatrixLegs {
646646+ result := <-results
647647+ if result.err != nil {
648648+ logger.Error(result.err, "matrix leg failed", "arch", result.leg.Architecture)
649649+ errs = append(errs, result.err)
650650+ } else {
651651+ logger.Info("matrix leg completed", "arch", result.leg.Architecture)
652652+ }
460653 }
461654462462- if len(pods.Items) > 0 {
463463- pod = &pods.Items[0]
464464- break
655655+ if len(errs) > 0 {
656656+ return fmt.Errorf("matrix build failed: %v", errs[0])
465657 }
466658467467- time.Sleep(1 * time.Second)
659659+ logger.Info("all matrix legs completed successfully")
660660+ return nil
468661 }
469662470470- logger.Info("Found pod for job", "podName", pod.Name)
471471-472472- // Wait for pod to be running (or completed)
473473- deadline = time.Now().Add(5 * time.Minute)
474474- for {
475475- if time.Now().After(deadline) {
476476- return nil, fmt.Errorf("timeout waiting for pod to start")
663663+ if idx == 1 && data.FinalSpec != nil {
664664+ // Final phase: wait for the final runner
665665+ finalName := fmt.Sprintf("%s-final", data.Spec.Name)
666666+ key := loomgrpc.RunnerKey{
667667+ PipelineID: wid.PipelineId.AtUri().String(),
668668+ WorkflowName: finalName,
669669+ Architecture: data.FinalSpec.Architecture,
477670 }
478671479479- currentPod := &corev1.Pod{}
480480- err := e.client.Get(ctx, client.ObjectKey{
481481- Namespace: pod.Namespace,
482482- Name: pod.Name,
483483- }, currentPod)
484484- if err != nil {
485485- return nil, fmt.Errorf("failed to get pod: %w", err)
672672+ logger.Info("waiting for final runner to connect", "key", key.String())
673673+ select {
674674+ case <-e.hub.WaitForRunner(key):
675675+ logger.Info("final runner connected", "key", key.String())
676676+ case <-ctx.Done():
677677+ return fmt.Errorf("context canceled while waiting for final runner: %w", ctx.Err())
678678+ case <-time.After(10 * time.Minute):
679679+ return fmt.Errorf("timeout waiting for final runner to connect")
486680 }
487681488488- if currentPod.Status.Phase == corev1.PodRunning || currentPod.Status.Phase == corev1.PodSucceeded || currentPod.Status.Phase == corev1.PodFailed {
489489- pod = currentPod
490490- break
682682+ // Stream artifacts from matrix legs to the final runner
683683+ if e.artifacts != nil {
684684+ rs := e.hub.Get(key)
685685+ if rs != nil {
686686+ logger.Info("streaming artifacts to final runner", "pipeline", wid.Rkey)
687687+ if err := e.artifacts.StreamToRunner(wid.PipelineId.AtUri().String(), rs.SendToRunner); err != nil {
688688+ logger.Error(err, "failed to stream artifacts to final runner")
689689+ }
690690+ }
491691 }
492692493493- time.Sleep(1 * time.Second)
494494- }
693693+ // Wait for all final steps, emitting per-step control log lines so the
694694+ // final phase's user steps are visible in the rendered log (the
695695+ // framework only emits a single "Final" control entry above us).
696696+ for stepIdx, finalStep := range data.FinalSpec.Steps {
697697+ logStepID := finalLogStepID(len(data.MatrixLegs), stepIdx)
698698+ sStep := syntheticStep{
699699+ name: fmt.Sprintf("%s (final)", finalStep.Name),
700700+ command: finalStep.Command,
701701+ kind: models.StepKindUser,
702702+ }
703703+ if wfLogger != nil {
704704+ _, _ = wfLogger.ControlWriter(logStepID, sStep, models.StepStatusStart).Write([]byte{0})
705705+ }
706706+ err := e.waitForRunnerStep(ctx, key, stepIdx, logStepID, wfLogger)
707707+ if wfLogger != nil {
708708+ _, _ = wfLogger.ControlWriter(logStepID, sStep, models.StepStatusEnd).Write([]byte{0})
709709+ }
710710+ if err != nil {
711711+ return fmt.Errorf("final step %q: %w", finalStep.Name, err)
712712+ }
713713+ }
495714496496- // Only use Follow mode for running pods. For completed pods, we need to read
497497- // existing logs (Follow:true only streams NEW logs after connection).
498498- shouldFollow := pod.Status.Phase == corev1.PodRunning
499499- if !shouldFollow {
500500- logger.Info("Pod already completed, reading existing logs", "podName", pod.Name, "phase", pod.Status.Phase)
501501- } else {
502502- logger.Info("Pod is running, streaming logs", "podName", pod.Name, "phase", pod.Status.Phase)
715715+ logger.Info("final steps completed")
716716+ return nil
503717 }
504718505505- // Stream logs from the main container (not init containers)
506506- req := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{
507507- Container: "runner",
508508- Follow: shouldFollow,
509509- })
510510-511511- logStream, err := req.Stream(ctx)
512512- if err != nil {
513513- return nil, fmt.Errorf("failed to open log stream: %w", err)
514514- }
515515-516516- // Create scanner
517517- scanner := bufio.NewScanner(logStream)
518518- buf := make([]byte, 0, 64*1024)
519519- scanner.Buffer(buf, 1024*1024)
520520-521521- // Create and store stream
522522- stream = &workflowLogStream{
523523- scanner: scanner,
524524- stream: logStream,
525525- pod: pod,
526526- podPhase: pod.Status.Phase,
527527- }
528528-529529- e.streamMutex.Lock()
530530- e.logStreams[widKey] = stream
531531- e.streamMutex.Unlock()
532532-533533- return stream, nil
719719+ return fmt.Errorf("unexpected step index %d for multi-arch workflow", idx)
534720}
535721536536-// closeLogStream closes and removes a log stream
537537-func (e *KubernetesEngine) closeLogStream(wid models.WorkflowId) {
538538- widKey := wid.String()
539539-540540- e.streamMutex.Lock()
541541- defer e.streamMutex.Unlock()
542542-543543- if stream, exists := e.logStreams[widKey]; exists {
544544- stream.stream.Close()
545545- delete(e.logStreams, widKey)
722722+// waitForRunnerStep reads events from a runner's gRPC stream until a specific
723723+// step completes. runnerStepID is matched against StepID fields on runner
724724+// events to filter out other steps' events; logStepID is the step id passed to
725725+// wfLogger.DataWriter when forwarding log content. For single-arch workflows
726726+// these are the same; for matrix legs they differ so each leg's logs land in
727727+// a distinct UI step.
728728+func (e *KubernetesEngine) waitForRunnerStep(ctx context.Context, key loomgrpc.RunnerKey, runnerStepID, logStepID int, wfLogger models.WorkflowLogger) error {
729729+ rs := e.hub.Get(key)
730730+ if rs == nil {
731731+ return fmt.Errorf("runner not connected for %s", key.String())
546732 }
547547-}
548548-549549-// readUntilStepEnd reads from the log stream until the end event for the specified step
550550-func (e *KubernetesEngine) readUntilStepEnd(ctx context.Context, stream *workflowLogStream, stepID int, workflow *models.Workflow, wfLogger models.WorkflowLogger) error {
551551- scanner := stream.scanner
552733553553- for scanner.Scan() {
554554- line := scanner.Text()
555555-556556- // Try to parse as extendedLogLine from the runner binary (includes exit_code)
557557- var logLine extendedLogLine
558558- if err := json.Unmarshal([]byte(line), &logLine); err != nil {
559559- // Not JSON or parse error - skip
560560- continue
561561- }
562562-563563- // Validate step index
564564- if logLine.StepId < 0 || logLine.StepId >= len(workflow.Steps) {
565565- continue
566566- }
567567-568568- // Only process events for the current step
569569- if logLine.StepId != stepID {
570570- // Got event for a different step - this shouldn't happen in sequential execution
571571- // but log it and continue
572572- continue
573573- }
574574-575575- switch logLine.Kind {
576576- case models.LogKindControl:
577577- // Use control events from runner for flow control only
578578- // Don't write them - the core spindle engine writes control events
579579- if logLine.StepStatus == models.StepStatusEnd {
580580- // Check exit code before returning success
581581- if logLine.ExitCode != 0 {
582582- return fmt.Errorf("step %d failed with exit code %d", stepID, logLine.ExitCode)
734734+ for {
735735+ select {
736736+ case evt := <-rs.Steps:
737737+ if evt.StepID != runnerStepID {
738738+ continue
739739+ }
740740+ if evt.Status == "end" {
741741+ if evt.ExitCode != 0 {
742742+ return fmt.Errorf("step %d failed with exit code %d", runnerStepID, evt.ExitCode)
583743 }
584744 return nil
585745 }
586586- // For "start" events, just continue reading
587746588588- case models.LogKindData:
589589- // Log output from step
590590- if logLine.Stream == "" {
591591- logLine.Stream = "stdout" // Default to stdout
747747+ case logEvt := <-rs.Logs:
748748+ if logEvt.StepID != runnerStepID || wfLogger == nil {
749749+ continue
592750 }
593593- dataWriter := wfLogger.DataWriter(logLine.StepId, logLine.Stream)
594594- _, _ = dataWriter.Write([]byte(logLine.Content + "\n"))
595595- }
596596- }
751751+ stream := logEvt.Stream
752752+ if stream == "" {
753753+ stream = "stdout"
754754+ }
755755+ dataWriter := wfLogger.DataWriter(logStepID, stream)
756756+ _, _ = dataWriter.Write([]byte(logEvt.Content + "\n"))
597757598598- if err := scanner.Err(); err != nil {
599599- // EOF or context canceled is expected when pod terminates
600600- if err != io.EOF && !strings.Contains(err.Error(), "context canceled") {
601601- return fmt.Errorf("error reading logs: %w", err)
602602- }
603603- }
758758+ case <-rs.Done:
759759+ return fmt.Errorf("runner disconnected before step %d completed", runnerStepID)
604760605605- // Scanner ended without seeing step end event.
606606- // Re-check current pod status - it may have completed since we started streaming.
607607- currentPod := &corev1.Pod{}
608608- if err := e.client.Get(ctx, client.ObjectKey{Namespace: stream.pod.Namespace, Name: stream.pod.Name}, currentPod); err == nil {
609609- if currentPod.Status.Phase == corev1.PodSucceeded {
610610- // Pod succeeded - treat as success even without control event
611611- return nil
612612- }
613613- if currentPod.Status.Phase == corev1.PodFailed {
614614- return fmt.Errorf("pod failed before step %d completed", stepID)
761761+ case <-ctx.Done():
762762+ return fmt.Errorf("context canceled during step %d: %w", runnerStepID, ctx.Err())
615763 }
616764 }
617617-618618- // Pod status unknown or still running but stream ended unexpectedly
619619- return fmt.Errorf("log stream ended before step %d completed", stepID)
620765}
621766622767// Ensure KubernetesEngine implements the Engine interface
+187
internal/grpc/artifacts.go
···11+package grpc
22+33+import (
44+ "fmt"
55+ "io"
66+ "os"
77+ "path/filepath"
88+ "strings"
99+ "sync"
1010+1111+ "sigs.k8s.io/controller-runtime/pkg/log"
1212+1313+ pb "tangled.org/evan.jarrett.net/loom/internal/pb/loom/v1"
1414+)
1515+1616+const artifactChunkSize = 32 * 1024 // 32KB chunks
1717+1818+// ArtifactStore manages artifact files on disk, organized by pipeline/architecture.
1919+type ArtifactStore struct {
2020+ baseDir string
2121+ mu sync.RWMutex
2222+2323+ // Track open file writers for streaming artifact chunks
2424+ writers map[string]*os.File
2525+ writersMu sync.Mutex
2626+}
2727+2828+// NewArtifactStore creates a new artifact store at the given base directory.
2929+func NewArtifactStore(baseDir string) (*ArtifactStore, error) {
3030+ if err := os.MkdirAll(baseDir, 0755); err != nil {
3131+ return nil, fmt.Errorf("failed to create artifact store directory: %w", err)
3232+ }
3333+ return &ArtifactStore{
3434+ baseDir: baseDir,
3535+ writers: make(map[string]*os.File),
3636+ }, nil
3737+}
3838+3939+// artifactDir returns the directory for a pipeline/architecture combination.
4040+func (s *ArtifactStore) artifactDir(pipelineID, architecture string) string {
4141+ return filepath.Join(s.baseDir, pipelineID, architecture)
4242+}
4343+4444+// writerKey creates a unique key for tracking open file writers.
4545+func writerKey(pipelineID, architecture, path string) string {
4646+ return fmt.Sprintf("%s/%s/%s", pipelineID, architecture, path)
4747+}
4848+4949+// WriteChunk writes an artifact chunk to disk. Creates the file on first chunk,
5050+// appends on subsequent chunks, and closes on EOF.
5151+func (s *ArtifactStore) WriteChunk(pipelineID, architecture string, chunk ArtifactEvent) error {
5252+ logger := log.Log.WithName("artifacts")
5353+5454+ dir := s.artifactDir(pipelineID, architecture)
5555+ fullPath := filepath.Join(dir, chunk.Path)
5656+5757+ // Security: ensure the path doesn't escape the artifact directory
5858+ cleanPath, err := filepath.Rel(dir, fullPath)
5959+ if err != nil || strings.HasPrefix(cleanPath, "..") {
6060+ return fmt.Errorf("invalid artifact path: %s", chunk.Path)
6161+ }
6262+6363+ key := writerKey(pipelineID, architecture, chunk.Path)
6464+6565+ s.writersMu.Lock()
6666+ defer s.writersMu.Unlock()
6767+6868+ f, exists := s.writers[key]
6969+ if !exists {
7070+ // Create directory structure and open file
7171+ if err := os.MkdirAll(filepath.Dir(fullPath), 0755); err != nil {
7272+ return fmt.Errorf("failed to create artifact directory: %w", err)
7373+ }
7474+ f, err = os.Create(fullPath)
7575+ if err != nil {
7676+ return fmt.Errorf("failed to create artifact file: %w", err)
7777+ }
7878+ s.writers[key] = f
7979+ logger.Info("receiving artifact", "pipeline", pipelineID, "arch", architecture, "path", chunk.Path)
8080+ }
8181+8282+ if len(chunk.Data) > 0 {
8383+ if _, err := f.Write(chunk.Data); err != nil {
8484+ return fmt.Errorf("failed to write artifact chunk: %w", err)
8585+ }
8686+ }
8787+8888+ if chunk.EOF {
8989+ f.Close()
9090+ delete(s.writers, key)
9191+ logger.Info("artifact received", "pipeline", pipelineID, "arch", architecture, "path", chunk.Path)
9292+ }
9393+9494+ return nil
9595+}
9696+9797+// StreamToRunner sends all artifacts for a pipeline to a runner's SendToRunner channel.
9898+// Used by final jobs to receive artifacts from all matrix legs.
9999+func (s *ArtifactStore) StreamToRunner(pipelineID string, sendCh chan<- *pb.ConnectResponse) error {
100100+ logger := log.Log.WithName("artifacts")
101101+102102+ pipelineDir := filepath.Join(s.baseDir, pipelineID)
103103+ if _, err := os.Stat(pipelineDir); os.IsNotExist(err) {
104104+ logger.Info("no artifacts found for pipeline", "pipeline", pipelineID)
105105+ return nil
106106+ }
107107+108108+ // Walk each architecture directory
109109+ archDirs, err := os.ReadDir(pipelineDir)
110110+ if err != nil {
111111+ return fmt.Errorf("failed to read pipeline artifacts: %w", err)
112112+ }
113113+114114+ for _, archDir := range archDirs {
115115+ if !archDir.IsDir() {
116116+ continue
117117+ }
118118+ arch := archDir.Name()
119119+ archPath := filepath.Join(pipelineDir, arch)
120120+121121+ err := filepath.Walk(archPath, func(path string, info os.FileInfo, err error) error {
122122+ if err != nil || info.IsDir() {
123123+ return err
124124+ }
125125+126126+ relPath, err := filepath.Rel(archPath, path)
127127+ if err != nil {
128128+ return err
129129+ }
130130+131131+ f, err := os.Open(path)
132132+ if err != nil {
133133+ return fmt.Errorf("failed to open artifact %s: %w", path, err)
134134+ }
135135+ defer f.Close()
136136+137137+ buf := make([]byte, artifactChunkSize)
138138+ for {
139139+ n, readErr := f.Read(buf)
140140+ isEOF := readErr == io.EOF
141141+142142+ sendCh <- &pb.ConnectResponse{
143143+ Event: &pb.ConnectResponse_ArtifactData{
144144+ ArtifactData: &pb.ArtifactData{
145145+ SourceArchitecture: arch,
146146+ Path: relPath,
147147+ Data: buf[:n],
148148+ Eof: isEOF,
149149+ },
150150+ },
151151+ }
152152+153153+ if isEOF {
154154+ break
155155+ }
156156+ if readErr != nil {
157157+ return fmt.Errorf("failed to read artifact %s: %w", path, readErr)
158158+ }
159159+ }
160160+161161+ logger.Info("streamed artifact to runner", "pipeline", pipelineID, "arch", arch, "path", relPath)
162162+ return nil
163163+ })
164164+ if err != nil {
165165+ return err
166166+ }
167167+ }
168168+169169+ // Send a sentinel message so the runner knows artifact streaming is complete.
170170+ // An empty Path with Eof=true signals "all artifacts sent".
171171+ sendCh <- &pb.ConnectResponse{
172172+ Event: &pb.ConnectResponse_ArtifactData{
173173+ ArtifactData: &pb.ArtifactData{
174174+ Path: "",
175175+ Eof: true,
176176+ },
177177+ },
178178+ }
179179+180180+ return nil
181181+}
182182+183183+// Cleanup removes all artifacts for a pipeline.
184184+func (s *ArtifactStore) Cleanup(pipelineID string) error {
185185+ dir := filepath.Join(s.baseDir, pipelineID)
186186+ return os.RemoveAll(dir)
187187+}
+142
internal/grpc/hub.go
···11+package grpc
22+33+import (
44+ "fmt"
55+ "sync"
66+77+ pb "tangled.org/evan.jarrett.net/loom/internal/pb/loom/v1"
88+)
99+1010+// RunnerKey uniquely identifies a runner connection.
1111+type RunnerKey struct {
1212+ PipelineID string
1313+ WorkflowName string
1414+ Architecture string
1515+}
1616+1717+func (k RunnerKey) String() string {
1818+ return fmt.Sprintf("%s/%s/%s", k.PipelineID, k.WorkflowName, k.Architecture)
1919+}
2020+2121+// StepEvent represents a step lifecycle event received from a runner.
2222+type StepEvent struct {
2323+ StepID int
2424+ Status string // "start" or "end"
2525+ ExitCode int
2626+}
2727+2828+// LogEvent represents a log line received from a runner.
2929+type LogEvent struct {
3030+ StepID int
3131+ Stream string // "stdout" or "stderr"
3232+ Content string
3333+}
3434+3535+// ArtifactEvent represents an artifact chunk received from a runner.
3636+type ArtifactEvent struct {
3737+ Path string
3838+ Data []byte
3939+ EOF bool
4040+}
4141+4242+// RunnerStream holds the channels for a single runner connection.
4343+// The gRPC server writes to these channels; the engine reads from them.
4444+type RunnerStream struct {
4545+ Steps chan StepEvent
4646+ Logs chan LogEvent
4747+4848+ // SendToRunner allows the engine to send messages back to the runner.
4949+ // The gRPC server reads from this channel and sends to the runner.
5050+ SendToRunner chan *pb.ConnectResponse
5151+5252+ // Done is closed when the runner disconnects.
5353+ Done chan struct{}
5454+}
5555+5656+func newRunnerStream() *RunnerStream {
5757+ return &RunnerStream{
5858+ Steps: make(chan StepEvent, 64),
5959+ Logs: make(chan LogEvent, 256),
6060+ SendToRunner: make(chan *pb.ConnectResponse, 64),
6161+ Done: make(chan struct{}),
6262+ }
6363+}
6464+6565+// Hub manages active runner connections and provides channels for the engine
6666+// to consume events from runners.
6767+type Hub struct {
6868+ mu sync.RWMutex
6969+ streams map[string]*RunnerStream
7070+7171+ // waiters are channels that get notified when a runner with a given key connects.
7272+ waitersMu sync.Mutex
7373+ waiters map[string][]chan struct{}
7474+}
7575+7676+// NewHub creates a new Hub.
7777+func NewHub() *Hub {
7878+ return &Hub{
7979+ streams: make(map[string]*RunnerStream),
8080+ waiters: make(map[string][]chan struct{}),
8181+ }
8282+}
8383+8484+// Register creates a new RunnerStream for the given key.
8585+// Called by the gRPC server when a runner connects.
8686+func (h *Hub) Register(key RunnerKey) *RunnerStream {
8787+ h.mu.Lock()
8888+ stream := newRunnerStream()
8989+ h.streams[key.String()] = stream
9090+ h.mu.Unlock()
9191+9292+ // Notify any waiters
9393+ h.waitersMu.Lock()
9494+ if waiters, ok := h.waiters[key.String()]; ok {
9595+ for _, ch := range waiters {
9696+ close(ch)
9797+ }
9898+ delete(h.waiters, key.String())
9999+ }
100100+ h.waitersMu.Unlock()
101101+102102+ return stream
103103+}
104104+105105+// Unregister removes a runner stream and closes its Done channel.
106106+// Called by the gRPC server when a runner disconnects.
107107+func (h *Hub) Unregister(key RunnerKey) {
108108+ h.mu.Lock()
109109+ defer h.mu.Unlock()
110110+111111+ if stream, ok := h.streams[key.String()]; ok {
112112+ close(stream.Done)
113113+ delete(h.streams, key.String())
114114+ }
115115+}
116116+117117+// Get returns the RunnerStream for the given key, or nil if not connected.
118118+func (h *Hub) Get(key RunnerKey) *RunnerStream {
119119+ h.mu.RLock()
120120+ defer h.mu.RUnlock()
121121+ return h.streams[key.String()]
122122+}
123123+124124+// WaitForRunner returns a channel that is closed when a runner with the given key connects.
125125+// If the runner is already connected, returns a closed channel immediately.
126126+func (h *Hub) WaitForRunner(key RunnerKey) <-chan struct{} {
127127+ h.mu.RLock()
128128+ if _, ok := h.streams[key.String()]; ok {
129129+ h.mu.RUnlock()
130130+ ch := make(chan struct{})
131131+ close(ch)
132132+ return ch
133133+ }
134134+ h.mu.RUnlock()
135135+136136+ h.waitersMu.Lock()
137137+ defer h.waitersMu.Unlock()
138138+139139+ ch := make(chan struct{})
140140+ h.waiters[key.String()] = append(h.waiters[key.String()], ch)
141141+ return ch
142142+}
+136
internal/grpc/server.go
···11+package grpc
22+33+import (
44+ "fmt"
55+ "io"
66+ "net"
77+88+ "google.golang.org/grpc"
99+ "sigs.k8s.io/controller-runtime/pkg/log"
1010+1111+ pb "tangled.org/evan.jarrett.net/loom/internal/pb/loom/v1"
1212+)
1313+1414+// Server is the gRPC server that accepts runner connections.
1515+type Server struct {
1616+ pb.UnimplementedLoomRunnerServiceServer
1717+1818+ hub *Hub
1919+ artifacts *ArtifactStore
2020+ grpcServer *grpc.Server
2121+}
2222+2323+// NewServer creates a new gRPC server with the given hub and artifact store.
2424+func NewServer(hub *Hub, artifacts *ArtifactStore) *Server {
2525+ s := &Server{
2626+ hub: hub,
2727+ artifacts: artifacts,
2828+ grpcServer: grpc.NewServer(),
2929+ }
3030+ pb.RegisterLoomRunnerServiceServer(s.grpcServer, s)
3131+ return s
3232+}
3333+3434+// Serve starts the gRPC server on the given listener.
3535+func (s *Server) Serve(lis net.Listener) error {
3636+ return s.grpcServer.Serve(lis)
3737+}
3838+3939+// GracefulStop gracefully stops the gRPC server.
4040+func (s *Server) GracefulStop() {
4141+ s.grpcServer.GracefulStop()
4242+}
4343+4444+// Connect handles a bidirectional stream from a runner.
4545+func (s *Server) Connect(stream grpc.BidiStreamingServer[pb.ConnectRequest, pb.ConnectResponse]) error {
4646+ logger := log.Log.WithName("grpc")
4747+4848+ // First message must contain identity fields
4949+ msg, err := stream.Recv()
5050+ if err != nil {
5151+ return fmt.Errorf("failed to receive initial message: %w", err)
5252+ }
5353+5454+ key := RunnerKey{
5555+ PipelineID: msg.PipelineId,
5656+ WorkflowName: msg.WorkflowName,
5757+ Architecture: msg.Architecture,
5858+ }
5959+6060+ if key.PipelineID == "" || key.WorkflowName == "" || key.Architecture == "" {
6161+ return fmt.Errorf("first message must include pipeline_id, workflow_name, and architecture")
6262+ }
6363+6464+ logger.Info("runner connected", "key", key.String())
6565+6666+ // Register this runner
6767+ rs := s.hub.Register(key)
6868+ defer func() {
6969+ s.hub.Unregister(key)
7070+ logger.Info("runner disconnected", "key", key.String())
7171+ }()
7272+7373+ // Process the first message's event (if any)
7474+ s.processEvent(key, rs, msg)
7575+7676+ // Start goroutine to send responses back to the runner
7777+ go func() {
7878+ for {
7979+ select {
8080+ case resp, ok := <-rs.SendToRunner:
8181+ if !ok {
8282+ return
8383+ }
8484+ if err := stream.Send(resp); err != nil {
8585+ logger.Error(err, "failed to send to runner", "key", key.String())
8686+ return
8787+ }
8888+ case <-rs.Done:
8989+ return
9090+ }
9191+ }
9292+ }()
9393+9494+ // Read events from the runner
9595+ for {
9696+ msg, err := stream.Recv()
9797+ if err == io.EOF {
9898+ return nil
9999+ }
100100+ if err != nil {
101101+ return fmt.Errorf("recv error from %s: %w", key.String(), err)
102102+ }
103103+ s.processEvent(key, rs, msg)
104104+ }
105105+}
106106+107107+// processEvent routes a runner event to the appropriate channel.
108108+func (s *Server) processEvent(key RunnerKey, rs *RunnerStream, msg *pb.ConnectRequest) {
109109+ logger := log.Log.WithName("grpc")
110110+111111+ switch evt := msg.Event.(type) {
112112+ case *pb.ConnectRequest_StepControl:
113113+ rs.Steps <- StepEvent{
114114+ StepID: int(evt.StepControl.StepId),
115115+ Status: evt.StepControl.Status,
116116+ ExitCode: int(evt.StepControl.ExitCode),
117117+ }
118118+ case *pb.ConnectRequest_LogLine:
119119+ rs.Logs <- LogEvent{
120120+ StepID: int(evt.LogLine.StepId),
121121+ Stream: evt.LogLine.Stream,
122122+ Content: evt.LogLine.Content,
123123+ }
124124+ case *pb.ConnectRequest_ArtifactChunk:
125125+ // Persist artifact to disk; final jobs stream it back via StreamToRunner.
126126+ if s.artifacts != nil {
127127+ if err := s.artifacts.WriteChunk(key.PipelineID, key.Architecture, ArtifactEvent{
128128+ Path: evt.ArtifactChunk.Path,
129129+ Data: evt.ArtifactChunk.Data,
130130+ EOF: evt.ArtifactChunk.Eof,
131131+ }); err != nil {
132132+ logger.Error(err, "failed to write artifact chunk", "key", key.String())
133133+ }
134134+ }
135135+ }
136136+}
+18
internal/jobbuilder/job_template.go
···67676868 // Namespace is the Kubernetes namespace for the Job
6969 Namespace string
7070+7171+ // OperatorAddr is the gRPC address of the Loom operator for runner communication.
7272+ // Injected as LOOM_OPERATOR_ADDR env var into the runner container.
7373+ OperatorAddr string
7074}
71757276// nodeMatchesSelector returns true if at least one node has all the labels in selector.
···255259 corev1.EnvVar{
256260 Name: "LOOM_SECRET_KEYS",
257261 Value: strings.Join(config.SecretKeys, ","),
262262+ },
263263+ corev1.EnvVar{
264264+ Name: "LOOM_OPERATOR_ADDR",
265265+ Value: config.OperatorAddr,
258266 },
259267 ),
260268···510518 MountPath: "/home/runner",
511519 SubPath: "runner",
512520 },
521521+ {
522522+ Name: "artifacts",
523523+ MountPath: "/artifacts",
524524+ },
513525 }
514526515527 // Mount registry credentials if specified
···559571 },
560572 {
561573 Name: "home-override",
574574+ VolumeSource: corev1.VolumeSource{
575575+ EmptyDir: &corev1.EmptyDirVolumeSource{},
576576+ },
577577+ },
578578+ {
579579+ Name: "artifacts",
562580 VolumeSource: corev1.VolumeSource{
563581 EmptyDir: &corev1.EmptyDirVolumeSource{},
564582 },
+635
internal/pb/loom/v1/loom.pb.go
···11+// Code generated by protoc-gen-go. DO NOT EDIT.
22+// versions:
33+// protoc-gen-go v1.36.8
44+// protoc (unknown)
55+// source: loom/v1/loom.proto
66+77+package loomv1
88+99+import (
1010+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
1111+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
1212+ reflect "reflect"
1313+ sync "sync"
1414+ unsafe "unsafe"
1515+)
1616+1717+const (
1818+ // Verify that this generated code is sufficiently up-to-date.
1919+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
2020+ // Verify that runtime/protoimpl is sufficiently up-to-date.
2121+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
2222+)
2323+2424+// ConnectRequest is sent from the runner to the operator.
2525+type ConnectRequest struct {
2626+ state protoimpl.MessageState `protogen:"open.v1"`
2727+ // Identity fields — must be set on the first message, optional on subsequent.
2828+ PipelineId string `protobuf:"bytes,1,opt,name=pipeline_id,json=pipelineId,proto3" json:"pipeline_id,omitempty"`
2929+ WorkflowName string `protobuf:"bytes,2,opt,name=workflow_name,json=workflowName,proto3" json:"workflow_name,omitempty"`
3030+ Architecture string `protobuf:"bytes,3,opt,name=architecture,proto3" json:"architecture,omitempty"`
3131+ // Types that are valid to be assigned to Event:
3232+ //
3333+ // *ConnectRequest_StepControl
3434+ // *ConnectRequest_LogLine
3535+ // *ConnectRequest_ArtifactChunk
3636+ Event isConnectRequest_Event `protobuf_oneof:"event"`
3737+ unknownFields protoimpl.UnknownFields
3838+ sizeCache protoimpl.SizeCache
3939+}
4040+4141+func (x *ConnectRequest) Reset() {
4242+ *x = ConnectRequest{}
4343+ mi := &file_loom_v1_loom_proto_msgTypes[0]
4444+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
4545+ ms.StoreMessageInfo(mi)
4646+}
4747+4848+func (x *ConnectRequest) String() string {
4949+ return protoimpl.X.MessageStringOf(x)
5050+}
5151+5252+func (*ConnectRequest) ProtoMessage() {}
5353+5454+func (x *ConnectRequest) ProtoReflect() protoreflect.Message {
5555+ mi := &file_loom_v1_loom_proto_msgTypes[0]
5656+ if x != nil {
5757+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
5858+ if ms.LoadMessageInfo() == nil {
5959+ ms.StoreMessageInfo(mi)
6060+ }
6161+ return ms
6262+ }
6363+ return mi.MessageOf(x)
6464+}
6565+6666+// Deprecated: Use ConnectRequest.ProtoReflect.Descriptor instead.
6767+func (*ConnectRequest) Descriptor() ([]byte, []int) {
6868+ return file_loom_v1_loom_proto_rawDescGZIP(), []int{0}
6969+}
7070+7171+func (x *ConnectRequest) GetPipelineId() string {
7272+ if x != nil {
7373+ return x.PipelineId
7474+ }
7575+ return ""
7676+}
7777+7878+func (x *ConnectRequest) GetWorkflowName() string {
7979+ if x != nil {
8080+ return x.WorkflowName
8181+ }
8282+ return ""
8383+}
8484+8585+func (x *ConnectRequest) GetArchitecture() string {
8686+ if x != nil {
8787+ return x.Architecture
8888+ }
8989+ return ""
9090+}
9191+9292+func (x *ConnectRequest) GetEvent() isConnectRequest_Event {
9393+ if x != nil {
9494+ return x.Event
9595+ }
9696+ return nil
9797+}
9898+9999+func (x *ConnectRequest) GetStepControl() *StepControl {
100100+ if x != nil {
101101+ if x, ok := x.Event.(*ConnectRequest_StepControl); ok {
102102+ return x.StepControl
103103+ }
104104+ }
105105+ return nil
106106+}
107107+108108+func (x *ConnectRequest) GetLogLine() *LogLine {
109109+ if x != nil {
110110+ if x, ok := x.Event.(*ConnectRequest_LogLine); ok {
111111+ return x.LogLine
112112+ }
113113+ }
114114+ return nil
115115+}
116116+117117+func (x *ConnectRequest) GetArtifactChunk() *ArtifactChunk {
118118+ if x != nil {
119119+ if x, ok := x.Event.(*ConnectRequest_ArtifactChunk); ok {
120120+ return x.ArtifactChunk
121121+ }
122122+ }
123123+ return nil
124124+}
125125+126126+type isConnectRequest_Event interface {
127127+ isConnectRequest_Event()
128128+}
129129+130130+type ConnectRequest_StepControl struct {
131131+ StepControl *StepControl `protobuf:"bytes,4,opt,name=step_control,json=stepControl,proto3,oneof"`
132132+}
133133+134134+type ConnectRequest_LogLine struct {
135135+ LogLine *LogLine `protobuf:"bytes,5,opt,name=log_line,json=logLine,proto3,oneof"`
136136+}
137137+138138+type ConnectRequest_ArtifactChunk struct {
139139+ ArtifactChunk *ArtifactChunk `protobuf:"bytes,6,opt,name=artifact_chunk,json=artifactChunk,proto3,oneof"`
140140+}
141141+142142+func (*ConnectRequest_StepControl) isConnectRequest_Event() {}
143143+144144+func (*ConnectRequest_LogLine) isConnectRequest_Event() {}
145145+146146+func (*ConnectRequest_ArtifactChunk) isConnectRequest_Event() {}
147147+148148+// ConnectResponse is sent from the operator to the runner.
149149+type ConnectResponse struct {
150150+ state protoimpl.MessageState `protogen:"open.v1"`
151151+ // Types that are valid to be assigned to Event:
152152+ //
153153+ // *ConnectResponse_Ack
154154+ // *ConnectResponse_ArtifactData
155155+ Event isConnectResponse_Event `protobuf_oneof:"event"`
156156+ unknownFields protoimpl.UnknownFields
157157+ sizeCache protoimpl.SizeCache
158158+}
159159+160160+func (x *ConnectResponse) Reset() {
161161+ *x = ConnectResponse{}
162162+ mi := &file_loom_v1_loom_proto_msgTypes[1]
163163+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
164164+ ms.StoreMessageInfo(mi)
165165+}
166166+167167+func (x *ConnectResponse) String() string {
168168+ return protoimpl.X.MessageStringOf(x)
169169+}
170170+171171+func (*ConnectResponse) ProtoMessage() {}
172172+173173+func (x *ConnectResponse) ProtoReflect() protoreflect.Message {
174174+ mi := &file_loom_v1_loom_proto_msgTypes[1]
175175+ if x != nil {
176176+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
177177+ if ms.LoadMessageInfo() == nil {
178178+ ms.StoreMessageInfo(mi)
179179+ }
180180+ return ms
181181+ }
182182+ return mi.MessageOf(x)
183183+}
184184+185185+// Deprecated: Use ConnectResponse.ProtoReflect.Descriptor instead.
186186+func (*ConnectResponse) Descriptor() ([]byte, []int) {
187187+ return file_loom_v1_loom_proto_rawDescGZIP(), []int{1}
188188+}
189189+190190+func (x *ConnectResponse) GetEvent() isConnectResponse_Event {
191191+ if x != nil {
192192+ return x.Event
193193+ }
194194+ return nil
195195+}
196196+197197+func (x *ConnectResponse) GetAck() *Ack {
198198+ if x != nil {
199199+ if x, ok := x.Event.(*ConnectResponse_Ack); ok {
200200+ return x.Ack
201201+ }
202202+ }
203203+ return nil
204204+}
205205+206206+func (x *ConnectResponse) GetArtifactData() *ArtifactData {
207207+ if x != nil {
208208+ if x, ok := x.Event.(*ConnectResponse_ArtifactData); ok {
209209+ return x.ArtifactData
210210+ }
211211+ }
212212+ return nil
213213+}
214214+215215+type isConnectResponse_Event interface {
216216+ isConnectResponse_Event()
217217+}
218218+219219+type ConnectResponse_Ack struct {
220220+ Ack *Ack `protobuf:"bytes,1,opt,name=ack,proto3,oneof"`
221221+}
222222+223223+type ConnectResponse_ArtifactData struct {
224224+ ArtifactData *ArtifactData `protobuf:"bytes,2,opt,name=artifact_data,json=artifactData,proto3,oneof"`
225225+}
226226+227227+func (*ConnectResponse_Ack) isConnectResponse_Event() {}
228228+229229+func (*ConnectResponse_ArtifactData) isConnectResponse_Event() {}
230230+231231+// StepControl signals step lifecycle transitions.
232232+type StepControl struct {
233233+ state protoimpl.MessageState `protogen:"open.v1"`
234234+ StepId int32 `protobuf:"varint,1,opt,name=step_id,json=stepId,proto3" json:"step_id,omitempty"`
235235+ // "start" or "end"
236236+ Status string `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"`
237237+ // Exit code of the step command (only meaningful when status = "end").
238238+ ExitCode int32 `protobuf:"varint,3,opt,name=exit_code,json=exitCode,proto3" json:"exit_code,omitempty"`
239239+ unknownFields protoimpl.UnknownFields
240240+ sizeCache protoimpl.SizeCache
241241+}
242242+243243+func (x *StepControl) Reset() {
244244+ *x = StepControl{}
245245+ mi := &file_loom_v1_loom_proto_msgTypes[2]
246246+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
247247+ ms.StoreMessageInfo(mi)
248248+}
249249+250250+func (x *StepControl) String() string {
251251+ return protoimpl.X.MessageStringOf(x)
252252+}
253253+254254+func (*StepControl) ProtoMessage() {}
255255+256256+func (x *StepControl) ProtoReflect() protoreflect.Message {
257257+ mi := &file_loom_v1_loom_proto_msgTypes[2]
258258+ if x != nil {
259259+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
260260+ if ms.LoadMessageInfo() == nil {
261261+ ms.StoreMessageInfo(mi)
262262+ }
263263+ return ms
264264+ }
265265+ return mi.MessageOf(x)
266266+}
267267+268268+// Deprecated: Use StepControl.ProtoReflect.Descriptor instead.
269269+func (*StepControl) Descriptor() ([]byte, []int) {
270270+ return file_loom_v1_loom_proto_rawDescGZIP(), []int{2}
271271+}
272272+273273+func (x *StepControl) GetStepId() int32 {
274274+ if x != nil {
275275+ return x.StepId
276276+ }
277277+ return 0
278278+}
279279+280280+func (x *StepControl) GetStatus() string {
281281+ if x != nil {
282282+ return x.Status
283283+ }
284284+ return ""
285285+}
286286+287287+func (x *StepControl) GetExitCode() int32 {
288288+ if x != nil {
289289+ return x.ExitCode
290290+ }
291291+ return 0
292292+}
293293+294294+// LogLine carries a single line of step output.
295295+type LogLine struct {
296296+ state protoimpl.MessageState `protogen:"open.v1"`
297297+ StepId int32 `protobuf:"varint,1,opt,name=step_id,json=stepId,proto3" json:"step_id,omitempty"`
298298+ // "stdout" or "stderr"
299299+ Stream string `protobuf:"bytes,2,opt,name=stream,proto3" json:"stream,omitempty"`
300300+ Content string `protobuf:"bytes,3,opt,name=content,proto3" json:"content,omitempty"`
301301+ unknownFields protoimpl.UnknownFields
302302+ sizeCache protoimpl.SizeCache
303303+}
304304+305305+func (x *LogLine) Reset() {
306306+ *x = LogLine{}
307307+ mi := &file_loom_v1_loom_proto_msgTypes[3]
308308+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
309309+ ms.StoreMessageInfo(mi)
310310+}
311311+312312+func (x *LogLine) String() string {
313313+ return protoimpl.X.MessageStringOf(x)
314314+}
315315+316316+func (*LogLine) ProtoMessage() {}
317317+318318+func (x *LogLine) ProtoReflect() protoreflect.Message {
319319+ mi := &file_loom_v1_loom_proto_msgTypes[3]
320320+ if x != nil {
321321+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
322322+ if ms.LoadMessageInfo() == nil {
323323+ ms.StoreMessageInfo(mi)
324324+ }
325325+ return ms
326326+ }
327327+ return mi.MessageOf(x)
328328+}
329329+330330+// Deprecated: Use LogLine.ProtoReflect.Descriptor instead.
331331+func (*LogLine) Descriptor() ([]byte, []int) {
332332+ return file_loom_v1_loom_proto_rawDescGZIP(), []int{3}
333333+}
334334+335335+func (x *LogLine) GetStepId() int32 {
336336+ if x != nil {
337337+ return x.StepId
338338+ }
339339+ return 0
340340+}
341341+342342+func (x *LogLine) GetStream() string {
343343+ if x != nil {
344344+ return x.Stream
345345+ }
346346+ return ""
347347+}
348348+349349+func (x *LogLine) GetContent() string {
350350+ if x != nil {
351351+ return x.Content
352352+ }
353353+ return ""
354354+}
355355+356356+// ArtifactChunk streams a file from runner to operator in chunks.
357357+// The runner sends one or more chunks per file, with eof=true on the last chunk.
358358+type ArtifactChunk struct {
359359+ state protoimpl.MessageState `protogen:"open.v1"`
360360+ // Relative path within the artifacts directory (e.g., "bin/myapp").
361361+ Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
362362+ Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
363363+ Eof bool `protobuf:"varint,3,opt,name=eof,proto3" json:"eof,omitempty"`
364364+ unknownFields protoimpl.UnknownFields
365365+ sizeCache protoimpl.SizeCache
366366+}
367367+368368+func (x *ArtifactChunk) Reset() {
369369+ *x = ArtifactChunk{}
370370+ mi := &file_loom_v1_loom_proto_msgTypes[4]
371371+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
372372+ ms.StoreMessageInfo(mi)
373373+}
374374+375375+func (x *ArtifactChunk) String() string {
376376+ return protoimpl.X.MessageStringOf(x)
377377+}
378378+379379+func (*ArtifactChunk) ProtoMessage() {}
380380+381381+func (x *ArtifactChunk) ProtoReflect() protoreflect.Message {
382382+ mi := &file_loom_v1_loom_proto_msgTypes[4]
383383+ if x != nil {
384384+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
385385+ if ms.LoadMessageInfo() == nil {
386386+ ms.StoreMessageInfo(mi)
387387+ }
388388+ return ms
389389+ }
390390+ return mi.MessageOf(x)
391391+}
392392+393393+// Deprecated: Use ArtifactChunk.ProtoReflect.Descriptor instead.
394394+func (*ArtifactChunk) Descriptor() ([]byte, []int) {
395395+ return file_loom_v1_loom_proto_rawDescGZIP(), []int{4}
396396+}
397397+398398+func (x *ArtifactChunk) GetPath() string {
399399+ if x != nil {
400400+ return x.Path
401401+ }
402402+ return ""
403403+}
404404+405405+func (x *ArtifactChunk) GetData() []byte {
406406+ if x != nil {
407407+ return x.Data
408408+ }
409409+ return nil
410410+}
411411+412412+func (x *ArtifactChunk) GetEof() bool {
413413+ if x != nil {
414414+ return x.Eof
415415+ }
416416+ return false
417417+}
418418+419419+// ArtifactData streams artifact files from operator to runner (for final jobs).
420420+// The operator sends collected artifacts from matrix legs to the final runner.
421421+type ArtifactData struct {
422422+ state protoimpl.MessageState `protogen:"open.v1"`
423423+ // Architecture of the source matrix leg (e.g., "amd64").
424424+ SourceArchitecture string `protobuf:"bytes,1,opt,name=source_architecture,json=sourceArchitecture,proto3" json:"source_architecture,omitempty"`
425425+ // Relative path within the artifacts directory.
426426+ Path string `protobuf:"bytes,2,opt,name=path,proto3" json:"path,omitempty"`
427427+ Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
428428+ Eof bool `protobuf:"varint,4,opt,name=eof,proto3" json:"eof,omitempty"`
429429+ unknownFields protoimpl.UnknownFields
430430+ sizeCache protoimpl.SizeCache
431431+}
432432+433433+func (x *ArtifactData) Reset() {
434434+ *x = ArtifactData{}
435435+ mi := &file_loom_v1_loom_proto_msgTypes[5]
436436+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
437437+ ms.StoreMessageInfo(mi)
438438+}
439439+440440+func (x *ArtifactData) String() string {
441441+ return protoimpl.X.MessageStringOf(x)
442442+}
443443+444444+func (*ArtifactData) ProtoMessage() {}
445445+446446+func (x *ArtifactData) ProtoReflect() protoreflect.Message {
447447+ mi := &file_loom_v1_loom_proto_msgTypes[5]
448448+ if x != nil {
449449+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
450450+ if ms.LoadMessageInfo() == nil {
451451+ ms.StoreMessageInfo(mi)
452452+ }
453453+ return ms
454454+ }
455455+ return mi.MessageOf(x)
456456+}
457457+458458+// Deprecated: Use ArtifactData.ProtoReflect.Descriptor instead.
459459+func (*ArtifactData) Descriptor() ([]byte, []int) {
460460+ return file_loom_v1_loom_proto_rawDescGZIP(), []int{5}
461461+}
462462+463463+func (x *ArtifactData) GetSourceArchitecture() string {
464464+ if x != nil {
465465+ return x.SourceArchitecture
466466+ }
467467+ return ""
468468+}
469469+470470+func (x *ArtifactData) GetPath() string {
471471+ if x != nil {
472472+ return x.Path
473473+ }
474474+ return ""
475475+}
476476+477477+func (x *ArtifactData) GetData() []byte {
478478+ if x != nil {
479479+ return x.Data
480480+ }
481481+ return nil
482482+}
483483+484484+func (x *ArtifactData) GetEof() bool {
485485+ if x != nil {
486486+ return x.Eof
487487+ }
488488+ return false
489489+}
490490+491491+// Ack acknowledges receipt of a runner event.
492492+type Ack struct {
493493+ state protoimpl.MessageState `protogen:"open.v1"`
494494+ unknownFields protoimpl.UnknownFields
495495+ sizeCache protoimpl.SizeCache
496496+}
497497+498498+func (x *Ack) Reset() {
499499+ *x = Ack{}
500500+ mi := &file_loom_v1_loom_proto_msgTypes[6]
501501+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
502502+ ms.StoreMessageInfo(mi)
503503+}
504504+505505+func (x *Ack) String() string {
506506+ return protoimpl.X.MessageStringOf(x)
507507+}
508508+509509+func (*Ack) ProtoMessage() {}
510510+511511+func (x *Ack) ProtoReflect() protoreflect.Message {
512512+ mi := &file_loom_v1_loom_proto_msgTypes[6]
513513+ if x != nil {
514514+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
515515+ if ms.LoadMessageInfo() == nil {
516516+ ms.StoreMessageInfo(mi)
517517+ }
518518+ return ms
519519+ }
520520+ return mi.MessageOf(x)
521521+}
522522+523523+// Deprecated: Use Ack.ProtoReflect.Descriptor instead.
524524+func (*Ack) Descriptor() ([]byte, []int) {
525525+ return file_loom_v1_loom_proto_rawDescGZIP(), []int{6}
526526+}
527527+528528+var File_loom_v1_loom_proto protoreflect.FileDescriptor
529529+530530+const file_loom_v1_loom_proto_rawDesc = "" +
531531+ "\n" +
532532+ "\x12loom/v1/loom.proto\x12\aloom.v1\"\xae\x02\n" +
533533+ "\x0eConnectRequest\x12\x1f\n" +
534534+ "\vpipeline_id\x18\x01 \x01(\tR\n" +
535535+ "pipelineId\x12#\n" +
536536+ "\rworkflow_name\x18\x02 \x01(\tR\fworkflowName\x12\"\n" +
537537+ "\farchitecture\x18\x03 \x01(\tR\farchitecture\x129\n" +
538538+ "\fstep_control\x18\x04 \x01(\v2\x14.loom.v1.StepControlH\x00R\vstepControl\x12-\n" +
539539+ "\blog_line\x18\x05 \x01(\v2\x10.loom.v1.LogLineH\x00R\alogLine\x12?\n" +
540540+ "\x0eartifact_chunk\x18\x06 \x01(\v2\x16.loom.v1.ArtifactChunkH\x00R\rartifactChunkB\a\n" +
541541+ "\x05event\"z\n" +
542542+ "\x0fConnectResponse\x12 \n" +
543543+ "\x03ack\x18\x01 \x01(\v2\f.loom.v1.AckH\x00R\x03ack\x12<\n" +
544544+ "\rartifact_data\x18\x02 \x01(\v2\x15.loom.v1.ArtifactDataH\x00R\fartifactDataB\a\n" +
545545+ "\x05event\"[\n" +
546546+ "\vStepControl\x12\x17\n" +
547547+ "\astep_id\x18\x01 \x01(\x05R\x06stepId\x12\x16\n" +
548548+ "\x06status\x18\x02 \x01(\tR\x06status\x12\x1b\n" +
549549+ "\texit_code\x18\x03 \x01(\x05R\bexitCode\"T\n" +
550550+ "\aLogLine\x12\x17\n" +
551551+ "\astep_id\x18\x01 \x01(\x05R\x06stepId\x12\x16\n" +
552552+ "\x06stream\x18\x02 \x01(\tR\x06stream\x12\x18\n" +
553553+ "\acontent\x18\x03 \x01(\tR\acontent\"I\n" +
554554+ "\rArtifactChunk\x12\x12\n" +
555555+ "\x04path\x18\x01 \x01(\tR\x04path\x12\x12\n" +
556556+ "\x04data\x18\x02 \x01(\fR\x04data\x12\x10\n" +
557557+ "\x03eof\x18\x03 \x01(\bR\x03eof\"y\n" +
558558+ "\fArtifactData\x12/\n" +
559559+ "\x13source_architecture\x18\x01 \x01(\tR\x12sourceArchitecture\x12\x12\n" +
560560+ "\x04path\x18\x02 \x01(\tR\x04path\x12\x12\n" +
561561+ "\x04data\x18\x03 \x01(\fR\x04data\x12\x10\n" +
562562+ "\x03eof\x18\x04 \x01(\bR\x03eof\"\x05\n" +
563563+ "\x03Ack2U\n" +
564564+ "\x11LoomRunnerService\x12@\n" +
565565+ "\aConnect\x12\x17.loom.v1.ConnectRequest\x1a\x18.loom.v1.ConnectResponse(\x010\x01B6Z4tangled.org/evan.jarrett.net/loom/internal/pb/loomv1b\x06proto3"
566566+567567+var (
568568+ file_loom_v1_loom_proto_rawDescOnce sync.Once
569569+ file_loom_v1_loom_proto_rawDescData []byte
570570+)
571571+572572+func file_loom_v1_loom_proto_rawDescGZIP() []byte {
573573+ file_loom_v1_loom_proto_rawDescOnce.Do(func() {
574574+ file_loom_v1_loom_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_loom_v1_loom_proto_rawDesc), len(file_loom_v1_loom_proto_rawDesc)))
575575+ })
576576+ return file_loom_v1_loom_proto_rawDescData
577577+}
578578+579579+var file_loom_v1_loom_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
580580+var file_loom_v1_loom_proto_goTypes = []any{
581581+ (*ConnectRequest)(nil), // 0: loom.v1.ConnectRequest
582582+ (*ConnectResponse)(nil), // 1: loom.v1.ConnectResponse
583583+ (*StepControl)(nil), // 2: loom.v1.StepControl
584584+ (*LogLine)(nil), // 3: loom.v1.LogLine
585585+ (*ArtifactChunk)(nil), // 4: loom.v1.ArtifactChunk
586586+ (*ArtifactData)(nil), // 5: loom.v1.ArtifactData
587587+ (*Ack)(nil), // 6: loom.v1.Ack
588588+}
589589+var file_loom_v1_loom_proto_depIdxs = []int32{
590590+ 2, // 0: loom.v1.ConnectRequest.step_control:type_name -> loom.v1.StepControl
591591+ 3, // 1: loom.v1.ConnectRequest.log_line:type_name -> loom.v1.LogLine
592592+ 4, // 2: loom.v1.ConnectRequest.artifact_chunk:type_name -> loom.v1.ArtifactChunk
593593+ 6, // 3: loom.v1.ConnectResponse.ack:type_name -> loom.v1.Ack
594594+ 5, // 4: loom.v1.ConnectResponse.artifact_data:type_name -> loom.v1.ArtifactData
595595+ 0, // 5: loom.v1.LoomRunnerService.Connect:input_type -> loom.v1.ConnectRequest
596596+ 1, // 6: loom.v1.LoomRunnerService.Connect:output_type -> loom.v1.ConnectResponse
597597+ 6, // [6:7] is the sub-list for method output_type
598598+ 5, // [5:6] is the sub-list for method input_type
599599+ 5, // [5:5] is the sub-list for extension type_name
600600+ 5, // [5:5] is the sub-list for extension extendee
601601+ 0, // [0:5] is the sub-list for field type_name
602602+}
603603+604604+func init() { file_loom_v1_loom_proto_init() }
605605+func file_loom_v1_loom_proto_init() {
606606+ if File_loom_v1_loom_proto != nil {
607607+ return
608608+ }
609609+ file_loom_v1_loom_proto_msgTypes[0].OneofWrappers = []any{
610610+ (*ConnectRequest_StepControl)(nil),
611611+ (*ConnectRequest_LogLine)(nil),
612612+ (*ConnectRequest_ArtifactChunk)(nil),
613613+ }
614614+ file_loom_v1_loom_proto_msgTypes[1].OneofWrappers = []any{
615615+ (*ConnectResponse_Ack)(nil),
616616+ (*ConnectResponse_ArtifactData)(nil),
617617+ }
618618+ type x struct{}
619619+ out := protoimpl.TypeBuilder{
620620+ File: protoimpl.DescBuilder{
621621+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
622622+ RawDescriptor: unsafe.Slice(unsafe.StringData(file_loom_v1_loom_proto_rawDesc), len(file_loom_v1_loom_proto_rawDesc)),
623623+ NumEnums: 0,
624624+ NumMessages: 7,
625625+ NumExtensions: 0,
626626+ NumServices: 1,
627627+ },
628628+ GoTypes: file_loom_v1_loom_proto_goTypes,
629629+ DependencyIndexes: file_loom_v1_loom_proto_depIdxs,
630630+ MessageInfos: file_loom_v1_loom_proto_msgTypes,
631631+ }.Build()
632632+ File_loom_v1_loom_proto = out.File
633633+ file_loom_v1_loom_proto_goTypes = nil
634634+ file_loom_v1_loom_proto_depIdxs = nil
635635+}
+129
internal/pb/loom/v1/loom_grpc.pb.go
···11+// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
22+// versions:
33+// - protoc-gen-go-grpc v1.5.1
44+// - protoc (unknown)
55+// source: loom/v1/loom.proto
66+77+package loomv1
88+99+import (
1010+ context "context"
1111+ grpc "google.golang.org/grpc"
1212+ codes "google.golang.org/grpc/codes"
1313+ status "google.golang.org/grpc/status"
1414+)
1515+1616+// This is a compile-time assertion to ensure that this generated file
1717+// is compatible with the grpc package it is being compiled against.
1818+// Requires gRPC-Go v1.64.0 or later.
1919+const _ = grpc.SupportPackageIsVersion9
2020+2121+const (
2222+ LoomRunnerService_Connect_FullMethodName = "/loom.v1.LoomRunnerService/Connect"
2323+)
2424+2525+// LoomRunnerServiceClient is the client API for LoomRunnerService service.
2626+//
2727+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
2828+//
2929+// LoomRunnerService is the bidirectional communication channel between runner pods
3030+// and the Loom operator. Runners connect on startup and stream all events
3131+// (step control, log output, artifacts) over this single connection.
3232+type LoomRunnerServiceClient interface {
3333+ // Connect establishes a bidirectional stream between a runner and the operator.
3434+ // The runner identifies itself in the first message and then streams events.
3535+ // The operator sends acknowledgements and commands back.
3636+ Connect(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ConnectRequest, ConnectResponse], error)
3737+}
3838+3939+type loomRunnerServiceClient struct {
4040+ cc grpc.ClientConnInterface
4141+}
4242+4343+func NewLoomRunnerServiceClient(cc grpc.ClientConnInterface) LoomRunnerServiceClient {
4444+ return &loomRunnerServiceClient{cc}
4545+}
4646+4747+func (c *loomRunnerServiceClient) Connect(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ConnectRequest, ConnectResponse], error) {
4848+ cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
4949+ stream, err := c.cc.NewStream(ctx, &LoomRunnerService_ServiceDesc.Streams[0], LoomRunnerService_Connect_FullMethodName, cOpts...)
5050+ if err != nil {
5151+ return nil, err
5252+ }
5353+ x := &grpc.GenericClientStream[ConnectRequest, ConnectResponse]{ClientStream: stream}
5454+ return x, nil
5555+}
5656+5757+// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
5858+type LoomRunnerService_ConnectClient = grpc.BidiStreamingClient[ConnectRequest, ConnectResponse]
5959+6060+// LoomRunnerServiceServer is the server API for LoomRunnerService service.
6161+// All implementations must embed UnimplementedLoomRunnerServiceServer
6262+// for forward compatibility.
6363+//
6464+// LoomRunnerService is the bidirectional communication channel between runner pods
6565+// and the Loom operator. Runners connect on startup and stream all events
6666+// (step control, log output, artifacts) over this single connection.
6767+type LoomRunnerServiceServer interface {
6868+ // Connect establishes a bidirectional stream between a runner and the operator.
6969+ // The runner identifies itself in the first message and then streams events.
7070+ // The operator sends acknowledgements and commands back.
7171+ Connect(grpc.BidiStreamingServer[ConnectRequest, ConnectResponse]) error
7272+ mustEmbedUnimplementedLoomRunnerServiceServer()
7373+}
7474+7575+// UnimplementedLoomRunnerServiceServer must be embedded to have
7676+// forward compatible implementations.
7777+//
7878+// NOTE: this should be embedded by value instead of pointer to avoid a nil
7979+// pointer dereference when methods are called.
8080+type UnimplementedLoomRunnerServiceServer struct{}
8181+8282+func (UnimplementedLoomRunnerServiceServer) Connect(grpc.BidiStreamingServer[ConnectRequest, ConnectResponse]) error {
8383+ return status.Errorf(codes.Unimplemented, "method Connect not implemented")
8484+}
8585+func (UnimplementedLoomRunnerServiceServer) mustEmbedUnimplementedLoomRunnerServiceServer() {}
8686+func (UnimplementedLoomRunnerServiceServer) testEmbeddedByValue() {}
8787+8888+// UnsafeLoomRunnerServiceServer may be embedded to opt out of forward compatibility for this service.
8989+// Use of this interface is not recommended, as added methods to LoomRunnerServiceServer will
9090+// result in compilation errors.
9191+type UnsafeLoomRunnerServiceServer interface {
9292+ mustEmbedUnimplementedLoomRunnerServiceServer()
9393+}
9494+9595+func RegisterLoomRunnerServiceServer(s grpc.ServiceRegistrar, srv LoomRunnerServiceServer) {
9696+ // If the following call pancis, it indicates UnimplementedLoomRunnerServiceServer was
9797+ // embedded by pointer and is nil. This will cause panics if an
9898+ // unimplemented method is ever invoked, so we test this at initialization
9999+ // time to prevent it from happening at runtime later due to I/O.
100100+ if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
101101+ t.testEmbeddedByValue()
102102+ }
103103+ s.RegisterService(&LoomRunnerService_ServiceDesc, srv)
104104+}
105105+106106+func _LoomRunnerService_Connect_Handler(srv interface{}, stream grpc.ServerStream) error {
107107+ return srv.(LoomRunnerServiceServer).Connect(&grpc.GenericServerStream[ConnectRequest, ConnectResponse]{ServerStream: stream})
108108+}
109109+110110+// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
111111+type LoomRunnerService_ConnectServer = grpc.BidiStreamingServer[ConnectRequest, ConnectResponse]
112112+113113+// LoomRunnerService_ServiceDesc is the grpc.ServiceDesc for LoomRunnerService service.
114114+// It's only intended for direct use with grpc.RegisterService,
115115+// and not to be introspected or modified (even as a copy)
116116+var LoomRunnerService_ServiceDesc = grpc.ServiceDesc{
117117+ ServiceName: "loom.v1.LoomRunnerService",
118118+ HandlerType: (*LoomRunnerServiceServer)(nil),
119119+ Methods: []grpc.MethodDesc{},
120120+ Streams: []grpc.StreamDesc{
121121+ {
122122+ StreamName: "Connect",
123123+ Handler: _LoomRunnerService_Connect_Handler,
124124+ ServerStreams: true,
125125+ ClientStreams: true,
126126+ },
127127+ },
128128+ Metadata: "loom/v1/loom.proto",
129129+}
+77
proto/loom/v1/loom.proto
···11+syntax = "proto3";
22+33+package loom.v1;
44+55+option go_package = "tangled.org/evan.jarrett.net/loom/internal/pb/loomv1";
66+77+// LoomRunnerService is the bidirectional communication channel between runner pods
88+// and the Loom operator. Runners connect on startup and stream all events
99+// (step control, log output, artifacts) over this single connection.
1010+service LoomRunnerService {
1111+ // Connect establishes a bidirectional stream between a runner and the operator.
1212+ // The runner identifies itself in the first message and then streams events.
1313+ // The operator sends acknowledgements and commands back.
1414+ rpc Connect(stream ConnectRequest) returns (stream ConnectResponse);
1515+}
1616+1717+// ConnectRequest is sent from the runner to the operator.
1818+message ConnectRequest {
1919+ // Identity fields — must be set on the first message, optional on subsequent.
2020+ string pipeline_id = 1;
2121+ string workflow_name = 2;
2222+ string architecture = 3;
2323+2424+ oneof event {
2525+ StepControl step_control = 4;
2626+ LogLine log_line = 5;
2727+ ArtifactChunk artifact_chunk = 6;
2828+ }
2929+}
3030+3131+// ConnectResponse is sent from the operator to the runner.
3232+message ConnectResponse {
3333+ oneof event {
3434+ Ack ack = 1;
3535+ ArtifactData artifact_data = 2;
3636+ }
3737+}
3838+3939+// StepControl signals step lifecycle transitions.
4040+message StepControl {
4141+ int32 step_id = 1;
4242+ // "start" or "end"
4343+ string status = 2;
4444+ // Exit code of the step command (only meaningful when status = "end").
4545+ int32 exit_code = 3;
4646+}
4747+4848+// LogLine carries a single line of step output.
4949+message LogLine {
5050+ int32 step_id = 1;
5151+ // "stdout" or "stderr"
5252+ string stream = 2;
5353+ string content = 3;
5454+}
5555+5656+// ArtifactChunk streams a file from runner to operator in chunks.
5757+// The runner sends one or more chunks per file, with eof=true on the last chunk.
5858+message ArtifactChunk {
5959+ // Relative path within the artifacts directory (e.g., "bin/myapp").
6060+ string path = 1;
6161+ bytes data = 2;
6262+ bool eof = 3;
6363+}
6464+6565+// ArtifactData streams artifact files from operator to runner (for final jobs).
6666+// The operator sends collected artifacts from matrix legs to the final runner.
6767+message ArtifactData {
6868+ // Architecture of the source matrix leg (e.g., "amd64").
6969+ string source_architecture = 1;
7070+ // Relative path within the artifacts directory.
7171+ string path = 2;
7272+ bytes data = 3;
7373+ bool eof = 4;
7474+}
7575+7676+// Ack acknowledges receipt of a runner event.
7777+message Ack {}