Kubernetes Operator for Tangled Spindles
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

use spindleset and configmap

+949 -261
+127 -25
api/v1alpha1/spindleset_types.go
··· 25 25 // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. 26 26 27 27 // SpindleSetSpec defines the desired state of SpindleSet. 28 + // SpindleSet is an internal resource created by the Loom engine to manage pipeline executions. 29 + // Users do not create SpindleSet resources directly. 28 30 type SpindleSetSpec struct { 29 - // KnotUrl is the URL of the tangled.org knot to connect to (e.g., https://tangled.org/@org/repo). 31 + // PipelineRun contains pipeline-specific information for this pipeline execution. 32 + // This SpindleSet is ephemeral and represents a single pipeline run. 30 33 // +kubebuilder:validation:Required 31 - // +kubebuilder:validation:Pattern=`^https?://.*` 32 - KnotUrl string `json:"knotUrl"` 34 + PipelineRun *PipelineRunSpec `json:"pipelineRun"` 35 + 36 + // Template is the pod template configuration for jobs in this SpindleSet. 37 + // Set internally by the engine from ConfigMap configuration. 38 + // +optional 39 + Template SpindleTemplate `json:"template,omitempty"` 40 + } 33 41 34 - // KnotAuthSecret is the name of the Secret containing authentication credentials for the knot. 35 - // The secret should contain a key "token" with the auth token. 42 + // PipelineRunSpec defines the specification for a single pipeline execution. 43 + type PipelineRunSpec struct { 44 + // PipelineID is the unique identifier for this pipeline run from the knot. 36 45 // +kubebuilder:validation:Required 37 - KnotAuthSecret string `json:"knotAuthSecret"` 46 + PipelineID string `json:"pipelineID"` 47 + 48 + // Knot is the domain of the knot that triggered this pipeline. 49 + // +kubebuilder:validation:Required 50 + Knot string `json:"knot"` 51 + 52 + // RepoURL is the Git repository URL to clone. 53 + // +kubebuilder:validation:Required 54 + RepoURL string `json:"repoURL"` 55 + 56 + // CommitSHA is the Git commit to checkout. 57 + // +kubebuilder:validation:Required 58 + CommitSHA string `json:"commitSHA"` 38 59 39 - // MaxConcurrentJobs is the maximum number of concurrent spindle jobs that can run. 40 - // Defaults to 10 if not specified. 41 - // +kubebuilder:default=10 42 - // +kubebuilder:validation:Minimum=1 43 - // +kubebuilder:validation:Maximum=100 60 + // Workflows is the list of workflows to execute in this pipeline. 61 + // +kubebuilder:validation:MinItems=1 62 + Workflows []WorkflowSpec `json:"workflows"` 63 + } 64 + 65 + // WorkflowSpec defines a workflow to execute as part of a pipeline. 66 + // This is the canonical workflow definition that matches the .tangled/workflows/*.yaml format. 67 + type WorkflowSpec struct { 68 + // Name is the workflow filename (e.g., "workflow-amd64.yaml"). 69 + // +kubebuilder:validation:Required 70 + Name string `json:"name"` 71 + 72 + // Image is the container image to use for executing the workflow steps. 73 + // +kubebuilder:validation:Required 74 + Image string `json:"image"` 75 + 76 + // Architecture is the target architecture for this workflow (e.g., "amd64", "arm64"). 77 + // +kubebuilder:validation:Required 78 + // +kubebuilder:validation:Enum=amd64;arm64 79 + Architecture string `json:"architecture"` 80 + 81 + // Steps is the ordered list of steps to execute in this workflow. 44 82 // +optional 45 - MaxConcurrentJobs int32 `json:"maxConcurrentJobs,omitempty"` 83 + Steps []WorkflowStep `json:"steps,omitempty"` 84 + 85 + // When defines conditional execution rules for this workflow. 86 + // +optional 87 + When []WorkflowWhen `json:"when,omitempty"` 88 + 89 + // Environment contains workflow-level environment variables. 90 + // +optional 91 + Environment map[string]string `json:"environment,omitempty"` 46 92 47 - // Template is the default pod template configuration for spindle jobs. 48 - // Individual workflows can override these settings. 93 + // Dependencies specifies external dependencies for the workflow. 49 94 // +optional 50 - Template SpindleTemplate `json:"template,omitempty"` 95 + Dependencies *WorkflowDependencies `json:"dependencies,omitempty"` 96 + } 97 + 98 + // WorkflowStep defines a single step in a workflow. 99 + type WorkflowStep struct { 100 + // Name is the human-readable name of the step. 101 + // +kubebuilder:validation:Required 102 + Name string `json:"name"` 103 + 104 + // Command is the shell command to execute. 105 + // +kubebuilder:validation:Required 106 + Command string `json:"command"` 107 + 108 + // Environment contains step-specific environment variables. 109 + // +optional 110 + Environment map[string]string `json:"environment,omitempty"` 111 + } 112 + 113 + // WorkflowWhen defines conditional execution rules. 114 + type WorkflowWhen struct { 115 + // Event specifies which events trigger this workflow (e.g., "push", "pull_request"). 116 + // +optional 117 + Event []string `json:"event,omitempty"` 118 + 119 + // Branch specifies which branches trigger this workflow. 120 + // +optional 121 + Branch []string `json:"branch,omitempty"` 122 + } 123 + 124 + // WorkflowDependencies specifies external dependencies. 125 + type WorkflowDependencies struct { 126 + // Nixpkgs is a list of Nix packages to make available. 127 + // +optional 128 + Nixpkgs []string `json:"nixpkgs,omitempty"` 51 129 } 52 130 53 131 // SpindleTemplate defines the pod template configuration for spindle jobs. 132 + // This is configured via ConfigMap and used internally by the engine. 54 133 type SpindleTemplate struct { 55 134 // Resources defines the compute resource requirements for spindle jobs. 56 - // +optional 57 135 Resources corev1.ResourceRequirements `json:"resources,omitempty"` 58 136 59 137 // NodeSelector is a selector which must be true for the pod to fit on a node. 60 - // +optional 138 + // For MVP, this is not exposed via ConfigMap. 61 139 NodeSelector map[string]string `json:"nodeSelector,omitempty"` 62 140 63 141 // Tolerations allows pods to schedule onto nodes with matching taints. 64 - // +optional 142 + // For MVP, this is not exposed via ConfigMap. 65 143 Tolerations []corev1.Toleration `json:"tolerations,omitempty"` 66 144 67 145 // Affinity defines scheduling constraints for spindle job pods. 68 - // +optional 146 + // For MVP, this is not exposed via ConfigMap. 69 147 Affinity *corev1.Affinity `json:"affinity,omitempty"` 70 148 } 71 149 ··· 95 173 // +optional 96 174 FailedJobs int32 `json:"failedJobs,omitempty"` 97 175 98 - // WebSocketConnected indicates whether the WebSocket connection to the knot is active. 176 + // WorkflowStatuses tracks the status of individual workflows in a pipeline run. 99 177 // +optional 100 - WebSocketConnected bool `json:"webSocketConnected,omitempty"` 178 + WorkflowStatuses []WorkflowStatus `json:"workflowStatuses,omitempty"` 101 179 102 - // LastEventTime is the timestamp of the last pipeline event received from the knot. 180 + // Phase represents the current phase of the pipeline execution. 103 181 // +optional 104 - LastEventTime *metav1.Time `json:"lastEventTime,omitempty"` 182 + Phase string `json:"phase,omitempty"` 183 + } 184 + 185 + // WorkflowStatus tracks the status of a single workflow execution. 186 + type WorkflowStatus struct { 187 + // Name is the workflow name. 188 + Name string `json:"name"` 189 + 190 + // JobName is the name of the Kubernetes Job created for this workflow. 191 + // +optional 192 + JobName string `json:"jobName,omitempty"` 193 + 194 + // Phase is the current phase of the workflow (Pending, Running, Succeeded, Failed). 195 + // +optional 196 + Phase string `json:"phase,omitempty"` 197 + 198 + // StartTime is when the workflow started executing. 199 + // +optional 200 + StartTime *metav1.Time `json:"startTime,omitempty"` 201 + 202 + // CompletionTime is when the workflow finished. 203 + // +optional 204 + CompletionTime *metav1.Time `json:"completionTime,omitempty"` 105 205 } 106 206 107 207 // +kubebuilder:object:root=true 108 208 // +kubebuilder:subresource:status 109 - // +kubebuilder:printcolumn:name="Knot URL",type=string,JSONPath=`.spec.knotUrl` 110 - // +kubebuilder:printcolumn:name="Connected",type=boolean,JSONPath=`.status.webSocketConnected` 209 + // +kubebuilder:printcolumn:name="Pipeline ID",type=string,JSONPath=`.spec.pipelineRun.pipelineID` 210 + // +kubebuilder:printcolumn:name="Phase",type=string,JSONPath=`.status.phase` 111 211 // +kubebuilder:printcolumn:name="Running",type=integer,JSONPath=`.status.runningJobs` 112 212 // +kubebuilder:printcolumn:name="Completed",type=integer,JSONPath=`.status.completedJobs` 113 213 // +kubebuilder:printcolumn:name="Failed",type=integer,JSONPath=`.status.failedJobs` 114 214 // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" 115 215 116 216 // SpindleSet is the Schema for the spindlesets API. 217 + // SpindleSet is an internal resource that represents a single pipeline execution. 218 + // It groups Jobs for a pipeline run and provides automatic cleanup via owner references. 117 219 type SpindleSet struct { 118 220 metav1.TypeMeta `json:",inline"` 119 221 metav1.ObjectMeta `json:"metadata,omitempty"`
+164 -3
api/v1alpha1/zz_generated.deepcopy.go
··· 27 27 ) 28 28 29 29 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. 30 + func (in *PipelineRunSpec) DeepCopyInto(out *PipelineRunSpec) { 31 + *out = *in 32 + if in.Workflows != nil { 33 + in, out := &in.Workflows, &out.Workflows 34 + *out = make([]WorkflowSpec, len(*in)) 35 + for i := range *in { 36 + (*in)[i].DeepCopyInto(&(*out)[i]) 37 + } 38 + } 39 + } 40 + 41 + // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PipelineRunSpec. 42 + func (in *PipelineRunSpec) DeepCopy() *PipelineRunSpec { 43 + if in == nil { 44 + return nil 45 + } 46 + out := new(PipelineRunSpec) 47 + in.DeepCopyInto(out) 48 + return out 49 + } 50 + 51 + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. 30 52 func (in *SpindleSet) DeepCopyInto(out *SpindleSet) { 31 53 *out = *in 32 54 out.TypeMeta = in.TypeMeta ··· 88 110 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. 89 111 func (in *SpindleSetSpec) DeepCopyInto(out *SpindleSetSpec) { 90 112 *out = *in 113 + if in.PipelineRun != nil { 114 + in, out := &in.PipelineRun, &out.PipelineRun 115 + *out = new(PipelineRunSpec) 116 + (*in).DeepCopyInto(*out) 117 + } 91 118 in.Template.DeepCopyInto(&out.Template) 92 119 } 93 120 ··· 111 138 (*in)[i].DeepCopyInto(&(*out)[i]) 112 139 } 113 140 } 114 - if in.LastEventTime != nil { 115 - in, out := &in.LastEventTime, &out.LastEventTime 116 - *out = (*in).DeepCopy() 141 + if in.WorkflowStatuses != nil { 142 + in, out := &in.WorkflowStatuses, &out.WorkflowStatuses 143 + *out = make([]WorkflowStatus, len(*in)) 144 + for i := range *in { 145 + (*in)[i].DeepCopyInto(&(*out)[i]) 146 + } 117 147 } 118 148 } 119 149 ··· 161 191 in.DeepCopyInto(out) 162 192 return out 163 193 } 194 + 195 + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. 196 + func (in *WorkflowDependencies) DeepCopyInto(out *WorkflowDependencies) { 197 + *out = *in 198 + if in.Nixpkgs != nil { 199 + in, out := &in.Nixpkgs, &out.Nixpkgs 200 + *out = make([]string, len(*in)) 201 + copy(*out, *in) 202 + } 203 + } 204 + 205 + // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowDependencies. 206 + func (in *WorkflowDependencies) DeepCopy() *WorkflowDependencies { 207 + if in == nil { 208 + return nil 209 + } 210 + out := new(WorkflowDependencies) 211 + in.DeepCopyInto(out) 212 + return out 213 + } 214 + 215 + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. 216 + func (in *WorkflowSpec) DeepCopyInto(out *WorkflowSpec) { 217 + *out = *in 218 + if in.Steps != nil { 219 + in, out := &in.Steps, &out.Steps 220 + *out = make([]WorkflowStep, len(*in)) 221 + for i := range *in { 222 + (*in)[i].DeepCopyInto(&(*out)[i]) 223 + } 224 + } 225 + if in.When != nil { 226 + in, out := &in.When, &out.When 227 + *out = make([]WorkflowWhen, len(*in)) 228 + for i := range *in { 229 + (*in)[i].DeepCopyInto(&(*out)[i]) 230 + } 231 + } 232 + if in.Environment != nil { 233 + in, out := &in.Environment, &out.Environment 234 + *out = make(map[string]string, len(*in)) 235 + for key, val := range *in { 236 + (*out)[key] = val 237 + } 238 + } 239 + if in.Dependencies != nil { 240 + in, out := &in.Dependencies, &out.Dependencies 241 + *out = new(WorkflowDependencies) 242 + (*in).DeepCopyInto(*out) 243 + } 244 + } 245 + 246 + // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowSpec. 247 + func (in *WorkflowSpec) DeepCopy() *WorkflowSpec { 248 + if in == nil { 249 + return nil 250 + } 251 + out := new(WorkflowSpec) 252 + in.DeepCopyInto(out) 253 + return out 254 + } 255 + 256 + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. 257 + func (in *WorkflowStatus) DeepCopyInto(out *WorkflowStatus) { 258 + *out = *in 259 + if in.StartTime != nil { 260 + in, out := &in.StartTime, &out.StartTime 261 + *out = (*in).DeepCopy() 262 + } 263 + if in.CompletionTime != nil { 264 + in, out := &in.CompletionTime, &out.CompletionTime 265 + *out = (*in).DeepCopy() 266 + } 267 + } 268 + 269 + // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowStatus. 270 + func (in *WorkflowStatus) DeepCopy() *WorkflowStatus { 271 + if in == nil { 272 + return nil 273 + } 274 + out := new(WorkflowStatus) 275 + in.DeepCopyInto(out) 276 + return out 277 + } 278 + 279 + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. 280 + func (in *WorkflowStep) DeepCopyInto(out *WorkflowStep) { 281 + *out = *in 282 + if in.Environment != nil { 283 + in, out := &in.Environment, &out.Environment 284 + *out = make(map[string]string, len(*in)) 285 + for key, val := range *in { 286 + (*out)[key] = val 287 + } 288 + } 289 + } 290 + 291 + // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowStep. 292 + func (in *WorkflowStep) DeepCopy() *WorkflowStep { 293 + if in == nil { 294 + return nil 295 + } 296 + out := new(WorkflowStep) 297 + in.DeepCopyInto(out) 298 + return out 299 + } 300 + 301 + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. 302 + func (in *WorkflowWhen) DeepCopyInto(out *WorkflowWhen) { 303 + *out = *in 304 + if in.Event != nil { 305 + in, out := &in.Event, &out.Event 306 + *out = make([]string, len(*in)) 307 + copy(*out, *in) 308 + } 309 + if in.Branch != nil { 310 + in, out := &in.Branch, &out.Branch 311 + *out = make([]string, len(*in)) 312 + copy(*out, *in) 313 + } 314 + } 315 + 316 + // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowWhen. 317 + func (in *WorkflowWhen) DeepCopy() *WorkflowWhen { 318 + if in == nil { 319 + return nil 320 + } 321 + out := new(WorkflowWhen) 322 + in.DeepCopyInto(out) 323 + return out 324 + }
+55 -3
cmd/main.go
··· 28 28 // to ensure that exec-entrypoint and run can make use of them. 29 29 _ "k8s.io/client-go/plugin/pkg/client/auth" 30 30 31 + "gopkg.in/yaml.v3" 32 + corev1 "k8s.io/api/core/v1" 31 33 "k8s.io/apimachinery/pkg/runtime" 32 34 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 33 35 clientgoscheme "k8s.io/client-go/kubernetes/scheme" ··· 49 51 // +kubebuilder:scaffold:imports 50 52 ) 51 53 54 + // LoomConfig holds configuration from the ConfigMap 55 + type LoomConfig struct { 56 + MaxConcurrentJobs int `yaml:"maxConcurrentJobs"` 57 + Template LoomTemplateConfig `yaml:"template"` 58 + } 59 + 60 + // LoomTemplateConfig holds job template configuration 61 + type LoomTemplateConfig struct { 62 + Resources corev1.ResourceRequirements `yaml:"resources"` 63 + } 64 + 52 65 var ( 53 66 scheme = runtime.NewScheme() 54 67 setupLog = ctrl.Log.WithName("setup") ··· 61 74 // +kubebuilder:scaffold:scheme 62 75 } 63 76 77 + // loadLoomConfig loads configuration from the ConfigMap file 78 + func loadLoomConfig(configPath string) (*LoomConfig, error) { 79 + // Read config file 80 + data, err := os.ReadFile(configPath) 81 + if err != nil { 82 + return nil, fmt.Errorf("failed to read config file: %w", err) 83 + } 84 + 85 + // Parse YAML 86 + var cfg LoomConfig 87 + if err := yaml.Unmarshal(data, &cfg); err != nil { 88 + return nil, fmt.Errorf("failed to parse config: %w", err) 89 + } 90 + 91 + return &cfg, nil 92 + } 93 + 64 94 // initializeSpindle creates a spindle server with KubernetesEngine 65 - func initializeSpindle(ctx context.Context, cfg *config.Config, mgr ctrl.Manager) (*spindle.Spindle, error) { 95 + func initializeSpindle(ctx context.Context, cfg *config.Config, mgr ctrl.Manager, loomCfg *LoomConfig) (*spindle.Spindle, error) { 66 96 // Initialize Kubernetes engine 67 97 // Get namespace from environment (injected via Downward API) 68 98 namespace := os.Getenv("POD_NAMESPACE") 69 99 if namespace == "" { 70 100 namespace = "default" 71 101 } 72 - kubeEngine := engine.NewKubernetesEngine(mgr.GetClient(), namespace, loomv1alpha1.SpindleTemplate{}) 102 + 103 + // Create template from loom config 104 + template := loomv1alpha1.SpindleTemplate{ 105 + Resources: loomCfg.Template.Resources, 106 + } 107 + 108 + kubeEngine := engine.NewKubernetesEngine(mgr.GetClient(), mgr.GetConfig(), namespace, template) 73 109 74 110 // Create engines map with kubernetes engine 75 111 engines := map[string]models.Engine{ ··· 212 248 // Create context for spindle initialization 213 249 ctx := context.Background() 214 250 251 + // Load loom configuration from ConfigMap 252 + loomCfg, err := loadLoomConfig("/etc/loom/config.yaml") 253 + if err != nil { 254 + setupLog.Error(err, "failed to load loom config") 255 + os.Exit(1) 256 + } 257 + setupLog.Info("Loom configuration loaded", 258 + "maxConcurrentJobs", loomCfg.MaxConcurrentJobs, 259 + "cpuRequest", loomCfg.Template.Resources.Requests.Cpu().String(), 260 + "memoryRequest", loomCfg.Template.Resources.Requests.Memory().String()) 261 + 215 262 // Load spindle configuration from environment 216 263 spindleCfg, err := config.Load(ctx) 217 264 if err != nil { 218 265 setupLog.Error(err, "failed to load spindle config") 219 266 os.Exit(1) 267 + } 268 + 269 + // Override maxJobCount from loom config 270 + if loomCfg.MaxConcurrentJobs > 0 { 271 + spindleCfg.Server.MaxJobCount = loomCfg.MaxConcurrentJobs 220 272 } 221 273 222 274 mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ ··· 244 296 } 245 297 246 298 // Initialize spindle server with KubernetesEngine 247 - s, err := initializeSpindle(ctx, spindleCfg, mgr) 299 + s, err := initializeSpindle(ctx, spindleCfg, mgr, loomCfg) 248 300 if err != nil { 249 301 setupLog.Error(err, "failed to initialize spindle") 250 302 os.Exit(1)
+174 -44
config/crd/bases/loom.j5t.io_spindlesets.yaml
··· 15 15 scope: Namespaced 16 16 versions: 17 17 - additionalPrinterColumns: 18 - - jsonPath: .spec.knotUrl 19 - name: Knot URL 18 + - jsonPath: .spec.pipelineRun.pipelineID 19 + name: Pipeline ID 20 20 type: string 21 - - jsonPath: .status.webSocketConnected 22 - name: Connected 23 - type: boolean 21 + - jsonPath: .status.phase 22 + name: Phase 23 + type: string 24 24 - jsonPath: .status.runningJobs 25 25 name: Running 26 26 type: integer ··· 36 36 name: v1alpha1 37 37 schema: 38 38 openAPIV3Schema: 39 - description: SpindleSet is the Schema for the spindlesets API. 39 + description: |- 40 + SpindleSet is the Schema for the spindlesets API. 41 + SpindleSet is an internal resource that represents a single pipeline execution. 42 + It groups Jobs for a pipeline run and provides automatic cleanup via owner references. 40 43 properties: 41 44 apiVersion: 42 45 description: |- ··· 56 59 metadata: 57 60 type: object 58 61 spec: 59 - description: SpindleSetSpec defines the desired state of SpindleSet. 62 + description: |- 63 + SpindleSetSpec defines the desired state of SpindleSet. 64 + SpindleSet is an internal resource created by the Loom engine to manage pipeline executions. 65 + Users do not create SpindleSet resources directly. 60 66 properties: 61 - knotAuthSecret: 67 + pipelineRun: 62 68 description: |- 63 - KnotAuthSecret is the name of the Secret containing authentication credentials for the knot. 64 - The secret should contain a key "token" with the auth token. 65 - type: string 66 - knotUrl: 67 - description: KnotUrl is the URL of the tangled.org knot to connect 68 - to (e.g., https://tangled.org/@org/repo). 69 - pattern: ^https?://.* 70 - type: string 71 - maxConcurrentJobs: 72 - default: 10 73 - description: |- 74 - MaxConcurrentJobs is the maximum number of concurrent spindle jobs that can run. 75 - Defaults to 10 if not specified. 76 - format: int32 77 - maximum: 100 78 - minimum: 1 79 - type: integer 69 + PipelineRun contains pipeline-specific information for this pipeline execution. 70 + This SpindleSet is ephemeral and represents a single pipeline run. 71 + properties: 72 + commitSHA: 73 + description: CommitSHA is the Git commit to checkout. 74 + type: string 75 + knot: 76 + description: Knot is the domain of the knot that triggered this 77 + pipeline. 78 + type: string 79 + pipelineID: 80 + description: PipelineID is the unique identifier for this pipeline 81 + run from the knot. 82 + type: string 83 + repoURL: 84 + description: RepoURL is the Git repository URL to clone. 85 + type: string 86 + workflows: 87 + description: Workflows is the list of workflows to execute in 88 + this pipeline. 89 + items: 90 + description: |- 91 + WorkflowSpec defines a workflow to execute as part of a pipeline. 92 + This is the canonical workflow definition that matches the .tangled/workflows/*.yaml format. 93 + properties: 94 + architecture: 95 + description: Architecture is the target architecture for 96 + this workflow (e.g., "amd64", "arm64"). 97 + enum: 98 + - amd64 99 + - arm64 100 + type: string 101 + dependencies: 102 + description: Dependencies specifies external dependencies 103 + for the workflow. 104 + properties: 105 + nixpkgs: 106 + description: Nixpkgs is a list of Nix packages to make 107 + available. 108 + items: 109 + type: string 110 + type: array 111 + type: object 112 + environment: 113 + additionalProperties: 114 + type: string 115 + description: Environment contains workflow-level environment 116 + variables. 117 + type: object 118 + image: 119 + description: Image is the container image to use for executing 120 + the workflow steps. 121 + type: string 122 + name: 123 + description: Name is the workflow filename (e.g., "workflow-amd64.yaml"). 124 + type: string 125 + steps: 126 + description: Steps is the ordered list of steps to execute 127 + in this workflow. 128 + items: 129 + description: WorkflowStep defines a single step in a workflow. 130 + properties: 131 + command: 132 + description: Command is the shell command to execute. 133 + type: string 134 + environment: 135 + additionalProperties: 136 + type: string 137 + description: Environment contains step-specific environment 138 + variables. 139 + type: object 140 + name: 141 + description: Name is the human-readable name of the 142 + step. 143 + type: string 144 + required: 145 + - command 146 + - name 147 + type: object 148 + type: array 149 + when: 150 + description: When defines conditional execution rules for 151 + this workflow. 152 + items: 153 + description: WorkflowWhen defines conditional execution 154 + rules. 155 + properties: 156 + branch: 157 + description: Branch specifies which branches trigger 158 + this workflow. 159 + items: 160 + type: string 161 + type: array 162 + event: 163 + description: Event specifies which events trigger 164 + this workflow (e.g., "push", "pull_request"). 165 + items: 166 + type: string 167 + type: array 168 + type: object 169 + type: array 170 + required: 171 + - architecture 172 + - image 173 + - name 174 + type: object 175 + minItems: 1 176 + type: array 177 + required: 178 + - commitSHA 179 + - knot 180 + - pipelineID 181 + - repoURL 182 + - workflows 183 + type: object 80 184 template: 81 185 description: |- 82 - Template is the default pod template configuration for spindle jobs. 83 - Individual workflows can override these settings. 186 + Template is the pod template configuration for jobs in this SpindleSet. 187 + Set internally by the engine from ConfigMap configuration. 84 188 properties: 85 189 affinity: 86 - description: Affinity defines scheduling constraints for spindle 87 - job pods. 190 + description: |- 191 + Affinity defines scheduling constraints for spindle job pods. 192 + For MVP, this is not exposed via ConfigMap. 88 193 properties: 89 194 nodeAffinity: 90 195 description: Describes node affinity scheduling rules for ··· 1005 1110 nodeSelector: 1006 1111 additionalProperties: 1007 1112 type: string 1008 - description: NodeSelector is a selector which must be true for 1009 - the pod to fit on a node. 1113 + description: |- 1114 + NodeSelector is a selector which must be true for the pod to fit on a node. 1115 + For MVP, this is not exposed via ConfigMap. 1010 1116 type: object 1011 1117 resources: 1012 1118 description: Resources defines the compute resource requirements ··· 1069 1175 type: object 1070 1176 type: object 1071 1177 tolerations: 1072 - description: Tolerations allows pods to schedule onto nodes with 1073 - matching taints. 1178 + description: |- 1179 + Tolerations allows pods to schedule onto nodes with matching taints. 1180 + For MVP, this is not exposed via ConfigMap. 1074 1181 items: 1075 1182 description: |- 1076 1183 The pod this Toleration is attached to tolerates any taint that matches ··· 1110 1217 type: array 1111 1218 type: object 1112 1219 required: 1113 - - knotAuthSecret 1114 - - knotUrl 1220 + - pipelineRun 1115 1221 type: object 1116 1222 status: 1117 1223 description: SpindleSetStatus defines the observed state of SpindleSet. ··· 1187 1293 failed. 1188 1294 format: int32 1189 1295 type: integer 1190 - lastEventTime: 1191 - description: LastEventTime is the timestamp of the last pipeline event 1192 - received from the knot. 1193 - format: date-time 1194 - type: string 1195 1296 pendingJobs: 1196 1297 description: PendingJobs is the number of spindle jobs currently pending. 1197 1298 format: int32 1198 1299 type: integer 1300 + phase: 1301 + description: Phase represents the current phase of the pipeline execution. 1302 + type: string 1199 1303 runningJobs: 1200 1304 description: RunningJobs is the number of spindle jobs currently running. 1201 1305 format: int32 1202 1306 type: integer 1203 - webSocketConnected: 1204 - description: WebSocketConnected indicates whether the WebSocket connection 1205 - to the knot is active. 1206 - type: boolean 1307 + workflowStatuses: 1308 + description: WorkflowStatuses tracks the status of individual workflows 1309 + in a pipeline run. 1310 + items: 1311 + description: WorkflowStatus tracks the status of a single workflow 1312 + execution. 1313 + properties: 1314 + completionTime: 1315 + description: CompletionTime is when the workflow finished. 1316 + format: date-time 1317 + type: string 1318 + jobName: 1319 + description: JobName is the name of the Kubernetes Job created 1320 + for this workflow. 1321 + type: string 1322 + name: 1323 + description: Name is the workflow name. 1324 + type: string 1325 + phase: 1326 + description: Phase is the current phase of the workflow (Pending, 1327 + Running, Succeeded, Failed). 1328 + type: string 1329 + startTime: 1330 + description: StartTime is when the workflow started executing. 1331 + format: date-time 1332 + type: string 1333 + required: 1334 + - name 1335 + type: object 1336 + type: array 1207 1337 type: object 1208 1338 type: object 1209 1339 served: true
+1
config/manager/kustomization.yaml
··· 2 2 - manager.yaml 3 3 - service.yaml 4 4 - pvc.yaml 5 + - loom-config.yaml 5 6 apiVersion: kustomize.config.k8s.io/v1beta1 6 7 kind: Kustomization 7 8 images:
+19
config/manager/loom-config.yaml
··· 1 + apiVersion: v1 2 + kind: ConfigMap 3 + metadata: 4 + name: loom-config 5 + namespace: system 6 + data: 7 + config.yaml: | 8 + # Maximum number of concurrent spindle jobs 9 + maxConcurrentJobs: 5 10 + 11 + # Default template for spindle job pods 12 + template: 13 + resources: 14 + requests: 15 + cpu: "500m" 16 + memory: "1Gi" 17 + limits: 18 + cpu: "2" 19 + memory: "4Gi"
+6
config/manager/manager.yaml
··· 107 107 mountPath: /tmp/spindle-logs 108 108 - name: spindle-db 109 109 mountPath: /data 110 + - name: loom-config 111 + mountPath: /etc/loom 112 + readOnly: true 110 113 volumes: 111 114 - name: spindle-logs 112 115 emptyDir: {} 113 116 - name: spindle-db 114 117 persistentVolumeClaim: 115 118 claimName: spindle-db 119 + - name: loom-config 120 + configMap: 121 + name: loom-config 116 122 serviceAccountName: controller-manager 117 123 terminationGracePeriodSeconds: 10
+2 -2
config/samples/kustomization.yaml
··· 1 1 ## Append samples of your project ## 2 - resources: 3 - - loom_v1alpha1_spindleset.yaml 2 + # No samples needed - SpindleSet is internal-only 3 + resources: [] 4 4 # +kubebuilder:scaffold:manifestskustomizesamples
-35
config/samples/loom_v1alpha1_spindleset.yaml
··· 1 - apiVersion: loom.j5t.io/v1alpha1 2 - kind: SpindleSet 3 - metadata: 4 - labels: 5 - app.kubernetes.io/name: loom 6 - app.kubernetes.io/managed-by: kustomize 7 - name: spindleset-sample 8 - spec: 9 - # URL of the tangled.org knot to connect to 10 - knotUrl: https://knot1.tangled.sh 11 - 12 - # Name of the Secret containing the auth token (key: token) 13 - knotAuthSecret: spindle-auth 14 - 15 - # Maximum number of concurrent spindle jobs (default: 10) 16 - maxConcurrentJobs: 5 17 - 18 - # Default template for spindle job pods 19 - template: 20 - resources: 21 - requests: 22 - cpu: "500m" 23 - memory: "1Gi" 24 - limits: 25 - cpu: "2" 26 - memory: "4Gi" 27 - 28 - # Optional: Node selector for pod placement 29 - nodeSelector: {} 30 - 31 - # Optional: Tolerations for scheduling 32 - tolerations: [] 33 - 34 - # Optional: Pod affinity/anti-affinity rules 35 - affinity: {}
+140 -61
internal/controller/spindleset_controller.go
··· 25 25 "tangled.org/core/spindle/models" 26 26 27 27 batchv1 "k8s.io/api/batch/v1" 28 - corev1 "k8s.io/api/core/v1" 29 28 apierrors "k8s.io/apimachinery/pkg/api/errors" 30 29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 31 30 "k8s.io/apimachinery/pkg/runtime" 32 - "k8s.io/apimachinery/pkg/util/intstr" 33 31 "k8s.io/client-go/rest" 34 32 ctrl "sigs.k8s.io/controller-runtime" 35 33 "sigs.k8s.io/controller-runtime/pkg/client" ··· 37 35 "sigs.k8s.io/controller-runtime/pkg/log" 38 36 39 37 loomv1alpha1 "tangled.org/evan.jarrett.net/loom/api/v1alpha1" 38 + "tangled.org/evan.jarrett.net/loom/pkg/jobbuilder" 40 39 ) 41 40 42 41 // SpindleSetReconciler reconciles a SpindleSet object ··· 86 85 return ctrl.Result{}, err 87 86 } 88 87 89 - logger.Info("Reconciling SpindleSet", "name", spindleSet.Name, "knotUrl", spindleSet.Spec.KnotUrl) 88 + logger.Info("Reconciling SpindleSet", "name", spindleSet.Name, "pipelineID", spindleSet.Spec.PipelineRun.PipelineID) 90 89 91 90 // Add finalizer if not present 92 91 if !controllerutil.ContainsFinalizer(spindleSet, "loom.j5t.io/finalizer") { ··· 101 100 return r.handleDeletion(ctx, spindleSet) 102 101 } 103 102 104 - // Ensure spindle HTTP Service exists 105 - if err := r.ensureSpindleService(ctx, spindleSet); err != nil { 106 - logger.Error(err, "Failed to ensure spindle service") 107 - // Don't return error - we'll retry on next reconcile 103 + // Ensure Jobs are created for workflows 104 + if err := r.ensurePipelineJobs(ctx, spindleSet); err != nil { 105 + logger.Error(err, "Failed to ensure pipeline Jobs") 106 + return ctrl.Result{}, err 108 107 } 109 108 110 - // Monitor Job statuses and report to spindle DB 109 + // Monitor Job statuses 111 110 if err := r.monitorJobStatuses(ctx, spindleSet); err != nil { 112 111 logger.Error(err, "Failed to monitor job statuses") 112 + // Don't return error - we'll retry on next reconcile 113 + } 114 + 115 + // Clean up orphaned Jobs in this namespace 116 + if err := r.cleanupOrphanedJobs(ctx, spindleSet.Namespace); err != nil { 117 + logger.Error(err, "Failed to cleanup orphaned Jobs") 113 118 // Don't return error - we'll retry on next reconcile 114 119 } 115 120 ··· 260 265 *conditions = append(*conditions, newCondition) 261 266 } 262 267 263 - // ensureSpindleService ensures a Service exists to expose the spindle HTTP server 264 - func (r *SpindleSetReconciler) ensureSpindleService(ctx context.Context, spindleSet *loomv1alpha1.SpindleSet) error { 268 + // ensurePipelineJobs ensures Jobs are created for all workflows in a pipeline run 269 + func (r *SpindleSetReconciler) ensurePipelineJobs(ctx context.Context, spindleSet *loomv1alpha1.SpindleSet) error { 265 270 logger := log.FromContext(ctx) 266 271 267 - // Service name based on SpindleSet 268 - serviceName := fmt.Sprintf("%s-spindle", spindleSet.Name) 272 + pipelineRun := spindleSet.Spec.PipelineRun 273 + 274 + // Convert workflow steps to jobbuilder format and create Jobs for each workflow 275 + for _, workflowSpec := range pipelineRun.Workflows { 276 + // Check if Job already exists 277 + jobName := fmt.Sprintf("spindle-%s-%s", pipelineRun.PipelineID, workflowSpec.Name) 278 + if len(jobName) > 63 { 279 + jobName = jobName[:63] 280 + } 281 + 282 + existingJob := &batchv1.Job{} 283 + err := r.Get(ctx, client.ObjectKey{ 284 + Name: jobName, 285 + Namespace: spindleSet.Namespace, 286 + }, existingJob) 287 + 288 + if err == nil { 289 + // Job already exists 290 + logger.V(1).Info("Job already exists for workflow", "workflow", workflowSpec.Name, "job", jobName) 291 + continue 292 + } 293 + 294 + if !apierrors.IsNotFound(err) { 295 + return fmt.Errorf("failed to check for existing job: %w", err) 296 + } 297 + 298 + // Convert workflow steps to jobbuilder format 299 + jobSteps := make([]jobbuilder.WorkflowStep, 0, len(workflowSpec.Steps)) 300 + for _, step := range workflowSpec.Steps { 301 + jobSteps = append(jobSteps, jobbuilder.WorkflowStep{ 302 + Name: step.Name, 303 + Command: step.Command, 304 + Env: step.Environment, 305 + }) 306 + } 269 307 270 - // Check if Service already exists 271 - existingService := &corev1.Service{} 272 - err := r.Get(ctx, client.ObjectKey{ 273 - Name: serviceName, 274 - Namespace: spindleSet.Namespace, 275 - }, existingService) 308 + // Build Job configuration 309 + jobConfig := jobbuilder.WorkflowConfig{ 310 + WorkflowName: workflowSpec.Name, 311 + PipelineID: pipelineRun.PipelineID, 312 + SpindleSetName: spindleSet.Name, 313 + Image: workflowSpec.Image, 314 + Architecture: workflowSpec.Architecture, 315 + Steps: jobSteps, 316 + RepoURL: pipelineRun.RepoURL, 317 + CommitSHA: pipelineRun.CommitSHA, 318 + Secrets: nil, // TODO: Handle secrets 319 + Template: spindleSet.Spec.Template, 320 + Namespace: spindleSet.Namespace, 321 + Knot: pipelineRun.Knot, 322 + } 323 + 324 + // Create the Job 325 + job, err := jobbuilder.BuildJob(jobConfig) 326 + if err != nil { 327 + return fmt.Errorf("failed to build job for workflow %s: %w", workflowSpec.Name, err) 328 + } 276 329 277 - if err == nil { 278 - // Service already exists 279 - logger.V(1).Info("Spindle service already exists", "service", serviceName) 280 - return nil 330 + // Set SpindleSet as owner of the Job 331 + if err := controllerutil.SetControllerReference(spindleSet, job, r.Scheme); err != nil { 332 + return fmt.Errorf("failed to set controller reference: %w", err) 333 + } 334 + 335 + logger.Info("Creating Job for workflow", "workflow", workflowSpec.Name, "job", job.Name) 336 + if err := r.Create(ctx, job); err != nil { 337 + return fmt.Errorf("failed to create job for workflow %s: %w", workflowSpec.Name, err) 338 + } 339 + 340 + logger.Info("Job created successfully", "workflow", workflowSpec.Name, "job", job.Name) 281 341 } 282 342 283 - if !apierrors.IsNotFound(err) { 284 - return fmt.Errorf("failed to get service: %w", err) 343 + return nil 344 + } 345 + 346 + // cleanupOrphanedJobs cleans up Jobs without a matching SpindleSet 347 + func (r *SpindleSetReconciler) cleanupOrphanedJobs(ctx context.Context, namespace string) error { 348 + logger := log.FromContext(ctx) 349 + 350 + // List all spindle Jobs in the namespace 351 + jobList := &batchv1.JobList{} 352 + if err := r.List(ctx, jobList, 353 + client.InNamespace(namespace), 354 + client.MatchingLabels{"loom.j5t.io/component": "spindle"}); err != nil { 355 + return fmt.Errorf("failed to list Jobs: %w", err) 285 356 } 286 357 287 - // Create new Service 288 - service := &corev1.Service{ 289 - ObjectMeta: metav1.ObjectMeta{ 290 - Name: serviceName, 291 - Namespace: spindleSet.Namespace, 292 - Labels: map[string]string{ 293 - "loom.j5t.io/component": "spindle-http", 294 - "loom.j5t.io/spindleset": spindleSet.Name, 295 - }, 296 - }, 297 - Spec: corev1.ServiceSpec{ 298 - Selector: map[string]string{ 299 - // Select the loom operator pod 300 - // This assumes the operator deployment has this label 301 - "control-plane": "controller-manager", 302 - }, 303 - Ports: []corev1.ServicePort{ 304 - { 305 - Name: "http", 306 - Protocol: corev1.ProtocolTCP, 307 - Port: 6555, // Default spindle HTTP port 308 - TargetPort: intstr.FromInt(6555), 309 - }, 310 - }, 311 - Type: corev1.ServiceTypeClusterIP, 312 - }, 313 - } 358 + for _, job := range jobList.Items { 359 + spindleSetName := job.Labels["loom.j5t.io/spindleset"] 360 + if spindleSetName == "" { 361 + // Job missing spindleset label - this is an orphan from old code 362 + logger.Info("Found Job without spindleset label, deleting", "job", job.Name) 363 + if err := r.Delete(ctx, &job); client.IgnoreNotFound(err) != nil { 364 + logger.Error(err, "Failed to delete orphaned Job", "job", job.Name) 365 + } 366 + continue 367 + } 368 + 369 + // Check if SpindleSet still exists 370 + spindleSet := &loomv1alpha1.SpindleSet{} 371 + err := r.Get(ctx, client.ObjectKey{ 372 + Name: spindleSetName, 373 + Namespace: namespace, 374 + }, spindleSet) 375 + 376 + if apierrors.IsNotFound(err) { 377 + // SpindleSet was deleted but Job remains - this is an orphan 378 + logger.Info("Found orphaned Job, SpindleSet no longer exists", "job", job.Name, "spindleset", spindleSetName) 379 + if err := r.Delete(ctx, &job); client.IgnoreNotFound(err) != nil { 380 + logger.Error(err, "Failed to delete orphaned Job", "job", job.Name) 381 + } 382 + continue 383 + } 314 384 315 - // Set SpindleSet as owner 316 - if err := controllerutil.SetControllerReference(spindleSet, service, r.Scheme); err != nil { 317 - return fmt.Errorf("failed to set controller reference: %w", err) 318 - } 385 + if err != nil { 386 + // Error checking SpindleSet - skip this Job 387 + logger.Error(err, "Failed to check SpindleSet existence", "spindleset", spindleSetName) 388 + continue 389 + } 319 390 320 - logger.Info("Creating spindle HTTP service", "service", serviceName, "port", 6555) 321 - if err := r.Create(ctx, service); err != nil { 322 - return fmt.Errorf("failed to create service: %w", err) 391 + // Job has a valid SpindleSet, check if it should be cleaned up by age 392 + // This is a backup for the TTL controller 393 + if job.Status.CompletionTime != nil { 394 + // Job is complete, check age 395 + age := metav1.Now().Sub(job.Status.CompletionTime.Time) 396 + // TTL is 3600 seconds (1 hour), add buffer and clean up after 2 hours 397 + if age.Hours() > 2 { 398 + logger.Info("Cleaning up old completed Job", "job", job.Name, "age", age) 399 + if err := r.Delete(ctx, &job); client.IgnoreNotFound(err) != nil { 400 + logger.Error(err, "Failed to delete old Job", "job", job.Name) 401 + } 402 + } 403 + } 323 404 } 324 405 325 - logger.Info("Spindle HTTP service created successfully", "service", serviceName) 326 406 return nil 327 407 } 328 408 ··· 394 474 func (r *SpindleSetReconciler) SetupWithManager(mgr ctrl.Manager) error { 395 475 return ctrl.NewControllerManagedBy(mgr). 396 476 For(&loomv1alpha1.SpindleSet{}). 397 - Owns(&batchv1.Job{}). // Watch Jobs owned by SpindleSet 398 - Owns(&corev1.Service{}). // Watch Services owned by SpindleSet 477 + Owns(&batchv1.Job{}). // Watch Jobs owned by SpindleSet 399 478 Named("spindleset"). 400 479 Complete(r) 401 480 }
+252 -88
internal/engine/kubernetes_engine.go
··· 1 1 package engine 2 2 3 3 import ( 4 + "bufio" 4 5 "context" 5 6 "fmt" 7 + "io" 8 + "regexp" 9 + "strconv" 10 + "strings" 6 11 "time" 7 12 8 13 "gopkg.in/yaml.v3" 9 14 batchv1 "k8s.io/api/batch/v1" 15 + corev1 "k8s.io/api/core/v1" 10 16 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 17 + "k8s.io/client-go/kubernetes" 18 + "k8s.io/client-go/rest" 11 19 "sigs.k8s.io/controller-runtime/pkg/client" 12 20 "sigs.k8s.io/controller-runtime/pkg/log" 13 21 ··· 16 24 "tangled.org/core/spindle/secrets" 17 25 18 26 loomv1alpha1 "tangled.org/evan.jarrett.net/loom/api/v1alpha1" 19 - "tangled.org/evan.jarrett.net/loom/pkg/jobbuilder" 20 27 ) 21 28 22 29 // KubernetesEngine implements the spindle Engine interface for Kubernetes Jobs. 23 30 type KubernetesEngine struct { 24 31 client client.Client 32 + config *rest.Config 25 33 namespace string 26 34 template loomv1alpha1.SpindleTemplate 27 35 28 - // Track created jobs for cleanup 29 - jobs map[string]*batchv1.Job 36 + // Track created SpindleSets for cleanup 37 + spindleSets map[string]*loomv1alpha1.SpindleSet 30 38 31 39 // Store current knot for Job annotations 32 40 currentKnot string 33 41 } 34 42 35 43 // NewKubernetesEngine creates a new Kubernetes-based spindle engine. 36 - func NewKubernetesEngine(client client.Client, namespace string, template loomv1alpha1.SpindleTemplate) *KubernetesEngine { 44 + func NewKubernetesEngine(client client.Client, config *rest.Config, namespace string, template loomv1alpha1.SpindleTemplate) *KubernetesEngine { 37 45 return &KubernetesEngine{ 38 - client: client, 39 - namespace: namespace, 40 - template: template, 41 - jobs: make(map[string]*batchv1.Job), 46 + client: client, 47 + config: config, 48 + namespace: namespace, 49 + template: template, 50 + spindleSets: make(map[string]*loomv1alpha1.SpindleSet), 42 51 } 43 52 } 44 53 45 - // WorkflowSpec is the structure of the workflow YAML file. 46 - type WorkflowSpec struct { 47 - Image string `yaml:"image"` 48 - Architecture string `yaml:"architecture"` 49 - Steps []WorkflowStepSpec `yaml:"steps"` 50 - When []WorkflowWhenSpec `yaml:"when,omitempty"` 51 - Environment map[string]string `yaml:"environment,omitempty"` 52 - Dependencies *WorkflowDependenciesSpec `yaml:"dependencies,omitempty"` 53 - } 54 - 55 - type WorkflowStepSpec struct { 56 - Name string `yaml:"name"` 57 - Command string `yaml:"command"` 58 - Environment map[string]string `yaml:"environment,omitempty"` 59 - } 60 - 61 - type WorkflowWhenSpec struct { 62 - Event []string `yaml:"event,omitempty"` 63 - Branch []string `yaml:"branch,omitempty"` 64 - } 65 - 66 - type WorkflowDependenciesSpec struct { 67 - Nixpkgs []string `yaml:"nixpkgs,omitempty"` 68 - } 69 - 70 54 // SimpleStep implements the models.Step interface. 71 55 type SimpleStep struct { 72 56 StepName string ··· 88 72 89 73 // InitWorkflow parses the workflow YAML and initializes a Workflow model. 90 74 func (e *KubernetesEngine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { 91 - // Parse the Raw YAML 92 - var spec WorkflowSpec 75 + // Parse the Raw YAML into the unified WorkflowSpec type 76 + var spec loomv1alpha1.WorkflowSpec 93 77 if err := yaml.Unmarshal([]byte(twf.Raw), &spec); err != nil { 94 78 return nil, fmt.Errorf("failed to parse workflow YAML: %w", err) 95 79 } 96 80 81 + // Set the workflow name from the tangled workflow 82 + spec.Name = twf.Name 83 + 97 84 // Validate required fields 98 85 if spec.Image == "" { 99 86 return nil, fmt.Errorf("workflow must specify an 'image' field") ··· 130 117 return workflow, nil 131 118 } 132 119 133 - // SetupWorkflow creates the Kubernetes Job for the workflow. 120 + // SetupWorkflow creates a SpindleSet CR for the workflow. 134 121 func (e *KubernetesEngine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error { 135 122 logger := log.FromContext(ctx).WithValues("workflow", wid.Name, "pipeline", wid.PipelineId.Rkey) 136 123 ··· 140 127 return fmt.Errorf("invalid workflow data type") 141 128 } 142 129 143 - spec, ok := workflowData["spec"].(WorkflowSpec) 130 + spec, ok := workflowData["spec"].(loomv1alpha1.WorkflowSpec) 144 131 if !ok { 145 132 return fmt.Errorf("workflow spec not found in data") 146 133 } ··· 161 148 // Store knot for status reporting 162 149 e.currentKnot = triggerRepo.Knot 163 150 164 - // Convert workflow steps to jobbuilder format 165 - jobSteps := make([]jobbuilder.WorkflowStep, 0, len(wf.Steps)) 166 - for _, step := range wf.Steps { 167 - jobSteps = append(jobSteps, jobbuilder.WorkflowStep{ 168 - Name: step.Name(), 169 - Command: step.Command(), 170 - Env: nil, // TODO: Extract from step if available 171 - }) 172 - } 173 - 174 - // Build Job configuration 175 - jobConfig := jobbuilder.WorkflowConfig{ 176 - WorkflowName: wf.Name, 177 - PipelineID: wid.PipelineId.Rkey, // Use rkey as pipeline ID 178 - Image: spec.Image, 179 - Architecture: spec.Architecture, 180 - Steps: jobSteps, 181 - RepoURL: repoURL, 182 - CommitSHA: commitSHA, 183 - Secrets: nil, // TODO: Handle secrets 184 - Template: e.template, 185 - Namespace: e.namespace, 186 - Knot: e.currentKnot, 151 + // Generate SpindleSet name: spindle-{pipelineID} (truncated if needed) 152 + spindleSetName := fmt.Sprintf("spindle-%s", wid.PipelineId.Rkey) 153 + if len(spindleSetName) > 63 { 154 + // Kubernetes names must be 63 chars or less 155 + spindleSetName = spindleSetName[:63] 187 156 } 188 157 189 - // Create the Job 190 - job, err := jobbuilder.BuildJob(jobConfig) 191 - if err != nil { 192 - return fmt.Errorf("failed to build job: %w", err) 158 + // Create SpindleSet CR for this pipeline run 159 + spindleSet := &loomv1alpha1.SpindleSet{ 160 + ObjectMeta: metav1.ObjectMeta{ 161 + Name: spindleSetName, 162 + Namespace: e.namespace, 163 + Labels: map[string]string{ 164 + "loom.j5t.io/component": "spindle", 165 + "loom.j5t.io/pipeline-id": wid.PipelineId.Rkey, 166 + }, 167 + }, 168 + Spec: loomv1alpha1.SpindleSetSpec{ 169 + Template: e.template, 170 + PipelineRun: &loomv1alpha1.PipelineRunSpec{ 171 + PipelineID: wid.PipelineId.Rkey, 172 + Knot: e.currentKnot, 173 + RepoURL: repoURL, 174 + CommitSHA: commitSHA, 175 + Workflows: []loomv1alpha1.WorkflowSpec{spec}, 176 + }, 177 + }, 193 178 } 194 179 195 - // Create the Job in Kubernetes 196 - logger.Info("Creating Kubernetes Job", "jobName", job.Name) 197 - if err := e.client.Create(ctx, job); err != nil { 198 - return fmt.Errorf("failed to create Kubernetes Job: %w", err) 180 + // Create the SpindleSet in Kubernetes 181 + logger.Info("Creating SpindleSet", "spindleSetName", spindleSetName) 182 + if err := e.client.Create(ctx, spindleSet); err != nil { 183 + return fmt.Errorf("failed to create SpindleSet: %w", err) 199 184 } 200 185 201 - // Track the job for cleanup 202 - e.jobs[wid.String()] = job 186 + // Track the SpindleSet for cleanup 187 + e.spindleSets[wid.String()] = spindleSet 203 188 204 - logger.Info("Kubernetes Job created successfully", "jobName", job.Name) 189 + logger.Info("SpindleSet created successfully", "spindleSetName", spindleSetName) 205 190 return nil 206 191 } 207 192 ··· 211 196 return 1 * time.Hour 212 197 } 213 198 214 - // DestroyWorkflow cleans up the Kubernetes Job after completion. 199 + // DestroyWorkflow cleans up the SpindleSet after completion. 215 200 func (e *KubernetesEngine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 216 201 logger := log.FromContext(ctx).WithValues("workflow", wid.Name, "pipeline", wid.PipelineId.Rkey) 217 202 218 - job, exists := e.jobs[wid.String()] 203 + spindleSet, exists := e.spindleSets[wid.String()] 219 204 if !exists { 220 - logger.Info("No job found to destroy") 205 + logger.Info("No SpindleSet found to destroy") 221 206 return nil 222 207 } 223 208 224 - // Delete the Job 225 - // Note: Job has TTLSecondsAfterFinished set, so it will auto-cleanup 226 - // This is just for explicit cleanup if needed 227 - logger.Info("Cleaning up Kubernetes Job", "jobName", job.Name) 209 + // Delete the SpindleSet 210 + // Note: Jobs owned by SpindleSet will be automatically deleted by Kubernetes GC 211 + logger.Info("Cleaning up SpindleSet", "spindleSetName", spindleSet.Name) 228 212 229 - // Delete with propagation policy to remove pods 213 + // Delete with propagation policy to remove owned resources 230 214 deletePolicy := metav1.DeletePropagationForeground 231 215 deleteOptions := &client.DeleteOptions{ 232 216 PropagationPolicy: &deletePolicy, 233 217 } 234 218 235 - if err := e.client.Delete(ctx, job, deleteOptions); err != nil { 236 - // Ignore not found errors (job may have already been deleted by TTL) 219 + if err := e.client.Delete(ctx, spindleSet, deleteOptions); err != nil { 220 + // Ignore not found errors (SpindleSet may have already been deleted) 237 221 if client.IgnoreNotFound(err) != nil { 238 - return fmt.Errorf("failed to delete Kubernetes Job: %w", err) 222 + return fmt.Errorf("failed to delete SpindleSet: %w", err) 239 223 } 240 224 } 241 225 242 226 // Remove from tracking map 243 - delete(e.jobs, wid.String()) 227 + delete(e.spindleSets, wid.String()) 244 228 245 - logger.Info("Kubernetes Job cleaned up successfully") 229 + logger.Info("SpindleSet cleaned up successfully") 246 230 return nil 247 231 } 248 232 ··· 256 240 return nil 257 241 } 258 242 259 - job, exists := e.jobs[wid.String()] 243 + // Query for the Job created by SpindleSetReconciler 244 + spindleSet, exists := e.spindleSets[wid.String()] 260 245 if !exists { 261 - return fmt.Errorf("no job found for workflow") 246 + return fmt.Errorf("no SpindleSet found for workflow") 262 247 } 263 248 264 - logger.Info("Waiting for Kubernetes Job to complete", "jobName", job.Name) 249 + // Wait for Job to be created by controller 250 + var job *batchv1.Job 251 + deadline := time.Now().Add(5 * time.Minute) 252 + for { 253 + if time.Now().After(deadline) { 254 + return fmt.Errorf("timeout waiting for Job to be created by controller") 255 + } 265 256 266 - // Wait for the Job to complete (with timeout from WorkflowTimeout) 257 + jobList := &batchv1.JobList{} 258 + err := e.client.List(ctx, jobList, 259 + client.InNamespace(e.namespace), 260 + client.MatchingLabels{ 261 + "loom.j5t.io/spindleset": spindleSet.Name, 262 + "loom.j5t.io/workflow": w.Name, 263 + }) 264 + if err != nil { 265 + return fmt.Errorf("failed to list jobs: %w", err) 266 + } 267 + 268 + if len(jobList.Items) > 0 { 269 + job = &jobList.Items[0] 270 + break 271 + } 272 + 273 + time.Sleep(2 * time.Second) 274 + } 275 + 276 + logger.Info("Found Job for workflow, waiting for completion", "jobName", job.Name) 277 + 278 + // Stream logs from the Job's pod 279 + if wfLogger != nil { 280 + if err := e.streamJobLogs(ctx, job, w, wfLogger); err != nil { 281 + logger.Error(err, "Failed to stream logs") 282 + // Don't return error - continue to check Job status 283 + } 284 + } 285 + 286 + // Wait for the Job to complete 287 + return e.waitForJobCompletion(ctx, job) 288 + } 289 + 290 + // waitForJobCompletion polls the Job status until it completes or times out 291 + func (e *KubernetesEngine) waitForJobCompletion(ctx context.Context, job *batchv1.Job) error { 292 + logger := log.FromContext(ctx).WithValues("jobName", job.Name) 293 + 267 294 timeout := time.After(e.WorkflowTimeout()) 268 295 ticker := time.NewTicker(5 * time.Second) 269 296 defer ticker.Stop() ··· 300 327 // Still running, continue waiting 301 328 } 302 329 } 330 + } 331 + 332 + // streamJobLogs streams logs from the Job's pod to the WorkflowLogger 333 + func (e *KubernetesEngine) streamJobLogs(ctx context.Context, job *batchv1.Job, workflow *models.Workflow, wfLogger *models.WorkflowLogger) error { 334 + logger := log.FromContext(ctx).WithValues("jobName", job.Name) 335 + 336 + // Create kubernetes clientset for log streaming 337 + clientset, err := kubernetes.NewForConfig(e.config) 338 + if err != nil { 339 + return fmt.Errorf("failed to create kubernetes clientset: %w", err) 340 + } 341 + 342 + // Wait for pod to be created 343 + var pod *corev1.Pod 344 + deadline := time.Now().Add(2 * time.Minute) 345 + for { 346 + if time.Now().After(deadline) { 347 + return fmt.Errorf("timeout waiting for pod to be created") 348 + } 349 + 350 + pods := &corev1.PodList{} 351 + err := e.client.List(ctx, pods, client.InNamespace(job.Namespace), client.MatchingLabels(job.Spec.Template.Labels)) 352 + if err != nil { 353 + return fmt.Errorf("failed to list pods: %w", err) 354 + } 355 + 356 + if len(pods.Items) > 0 { 357 + pod = &pods.Items[0] 358 + break 359 + } 360 + 361 + time.Sleep(1 * time.Second) 362 + } 363 + 364 + logger.Info("Found pod for job", "podName", pod.Name) 365 + 366 + // Wait for pod to be running (or completed) 367 + deadline = time.Now().Add(5 * time.Minute) 368 + for { 369 + if time.Now().After(deadline) { 370 + return fmt.Errorf("timeout waiting for pod to start") 371 + } 372 + 373 + currentPod := &corev1.Pod{} 374 + err := e.client.Get(ctx, client.ObjectKey{ 375 + Namespace: pod.Namespace, 376 + Name: pod.Name, 377 + }, currentPod) 378 + if err != nil { 379 + return fmt.Errorf("failed to get pod: %w", err) 380 + } 381 + 382 + if currentPod.Status.Phase == corev1.PodRunning || currentPod.Status.Phase == corev1.PodSucceeded || currentPod.Status.Phase == corev1.PodFailed { 383 + pod = currentPod 384 + break 385 + } 386 + 387 + time.Sleep(1 * time.Second) 388 + } 389 + 390 + logger.Info("Pod is ready, streaming logs", "podName", pod.Name, "phase", pod.Status.Phase) 391 + 392 + // Stream logs from the main container (not init containers) 393 + req := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{ 394 + Container: "runner", 395 + Follow: true, 396 + }) 397 + 398 + stream, err := req.Stream(ctx) 399 + if err != nil { 400 + return fmt.Errorf("failed to open log stream: %w", err) 401 + } 402 + defer stream.Close() 403 + 404 + // Parse logs and send to WorkflowLogger 405 + return e.parseLogs(stream, workflow, wfLogger) 406 + } 407 + 408 + // parseLogs reads log lines, detects step transitions, and sends logs to WorkflowLogger 409 + func (e *KubernetesEngine) parseLogs(stream io.Reader, workflow *models.Workflow, wfLogger *models.WorkflowLogger) error { 410 + scanner := bufio.NewScanner(stream) 411 + 412 + // Regex patterns to detect step transitions 413 + startPattern := regexp.MustCompile(`===== Starting Step (\d+): (.+) =====`) 414 + completePattern := regexp.MustCompile(`===== Completed Step (\d+): (.+) =====`) 415 + 416 + currentStep := -1 // -1 means no step started yet (init containers) 417 + stepStarted := make(map[int]bool) 418 + 419 + for scanner.Scan() { 420 + line := scanner.Text() 421 + 422 + // Check for step start 423 + if matches := startPattern.FindStringSubmatch(line); matches != nil { 424 + stepNum, _ := strconv.Atoi(matches[1]) 425 + stepIdx := stepNum - 1 // Convert 1-based to 0-based 426 + 427 + if stepIdx >= 0 && stepIdx < len(workflow.Steps) { 428 + currentStep = stepIdx 429 + 430 + // Send control message for step start 431 + if !stepStarted[stepIdx] { 432 + step := workflow.Steps[stepIdx] 433 + controlWriter := wfLogger.ControlWriter(stepIdx, step, models.StepStatusStart) 434 + controlWriter.Write([]byte{}) 435 + stepStarted[stepIdx] = true 436 + } 437 + } 438 + } 439 + 440 + // Check for step completion 441 + if matches := completePattern.FindStringSubmatch(line); matches != nil { 442 + stepNum, _ := strconv.Atoi(matches[1]) 443 + stepIdx := stepNum - 1 444 + 445 + if stepIdx >= 0 && stepIdx < len(workflow.Steps) { 446 + step := workflow.Steps[stepIdx] 447 + controlWriter := wfLogger.ControlWriter(stepIdx, step, models.StepStatusEnd) 448 + controlWriter.Write([]byte{}) 449 + } 450 + } 451 + 452 + // Send data log line 453 + if currentStep >= 0 { 454 + dataWriter := wfLogger.DataWriter(currentStep, "stdout") 455 + dataWriter.Write([]byte(line + "\n")) 456 + } 457 + } 458 + 459 + if err := scanner.Err(); err != nil { 460 + // EOF or context canceled is expected 461 + if err != io.EOF && !strings.Contains(err.Error(), "context canceled") { 462 + return fmt.Errorf("error reading logs: %w", err) 463 + } 464 + } 465 + 466 + return nil 303 467 } 304 468 305 469 // Ensure KubernetesEngine implements the Engine interface
+9
pkg/jobbuilder/job_template.go
··· 20 20 // PipelineID is a unique identifier for this pipeline run 21 21 PipelineID string 22 22 23 + // SpindleSetName is the name of the owning SpindleSet resource 24 + SpindleSetName string 25 + 23 26 // Image is the container image to use for execution 24 27 Image string 25 28 ··· 59 62 if config.PipelineID == "" { 60 63 return nil, fmt.Errorf("pipeline ID is required") 61 64 } 65 + if config.SpindleSetName == "" { 66 + return nil, fmt.Errorf("spindleset name is required") 67 + } 62 68 63 69 // Generate the step execution script 64 70 script := BuildStepExecutionScript(config.Steps) ··· 101 107 "loom.j5t.io/component": "spindle", 102 108 "loom.j5t.io/pipeline-id": config.PipelineID, 103 109 "loom.j5t.io/workflow": config.WorkflowName, 110 + "loom.j5t.io/spindleset": config.SpindleSetName, 104 111 } 105 112 106 113 // Backoff limit: don't retry failed jobs 107 114 backoffLimit := int32(0) 108 115 ttlAfterFinished := int32(3600) // Clean up Jobs after 1 hour 116 + activeDeadline := int64(7200) // Force-fail Jobs after 2 hours 109 117 110 118 job := &batchv1.Job{ 111 119 ObjectMeta: metav1.ObjectMeta{ ··· 122 130 Spec: batchv1.JobSpec{ 123 131 BackoffLimit: &backoffLimit, 124 132 TTLSecondsAfterFinished: &ttlAfterFinished, 133 + ActiveDeadlineSeconds: &activeDeadline, 125 134 Template: corev1.PodTemplateSpec{ 126 135 ObjectMeta: metav1.ObjectMeta{ 127 136 Labels: labels,