Kubernetes Operator for Tangled Spindles
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix linting

+44 -24
+12
.golangci.yml
··· 1 1 version: "2" 2 2 run: 3 3 allow-parallel-runners: true 4 + issues: 5 + fix: true 4 6 linters: 5 7 default: none 6 8 enable: ··· 28 30 - name: import-shadowing 29 31 exclusions: 30 32 generated: lax 33 + presets: 34 + - std-error-handling 31 35 rules: 32 36 - linters: 33 37 - lll ··· 36 40 - dupl 37 41 - lll 38 42 path: internal/* 43 + - linters: 44 + - errcheck 45 + path: _test\.go 39 46 paths: 40 47 - third_party$ 41 48 - builtin$ ··· 44 51 enable: 45 52 - gofmt 46 53 - goimports 54 + settings: 55 + gofmt: 56 + rewrite-rules: 57 + - pattern: interface{} 58 + replacement: any 47 59 exclusions: 48 60 generated: lax 49 61 paths:
+1 -1
Makefile
··· 309 309 ENVTEST_VERSION ?= $(shell go list -m -f "{{ .Version }}" sigs.k8s.io/controller-runtime | awk -F'[v.]' '{printf "release-%d.%d", $$2, $$3}') 310 310 #ENVTEST_K8S_VERSION is the version of Kubernetes to use for setting up ENVTEST binaries (i.e. 1.31) 311 311 ENVTEST_K8S_VERSION ?= $(shell go list -m -f "{{ .Version }}" k8s.io/api | awk -F'[v.]' '{printf "1.%d", $$3}') 312 - GOLANGCI_LINT_VERSION ?= v2.1.0 312 + GOLANGCI_LINT_VERSION ?= v2.11.4 313 313 314 314 .PHONY: kustomize 315 315 kustomize: $(KUSTOMIZE) ## Download kustomize locally if necessary.
+3 -1
cmd/controller/main.go
··· 176 176 } 177 177 178 178 // initializeSpindle creates a spindle server with KubernetesEngine 179 - func initializeSpindle(ctx context.Context, cfg *config.Config, mgr ctrl.Manager, loomCfg *LoomConfig) (*spindle.Spindle, error) { 179 + func initializeSpindle( 180 + ctx context.Context, cfg *config.Config, mgr ctrl.Manager, loomCfg *LoomConfig, 181 + ) (*spindle.Spindle, error) { 180 182 // Initialize Kubernetes engine 181 183 // Get namespace from environment (injected via Downward API) 182 184 namespace := os.Getenv("POD_NAMESPACE")
+2 -1
config/crd/bases/loom.j5t.io_spindlesets.yaml
··· 1240 1240 operator: 1241 1241 description: |- 1242 1242 Operator represents a key's relationship to the value. 1243 - Valid operators are Exists and Equal. Defaults to Equal. 1243 + Valid operators are Exists, Equal, Lt, and Gt. Defaults to Equal. 1244 1244 Exists is equivalent to wildcard for value, so that a pod can 1245 1245 tolerate all taints of a particular category. 1246 + Lt and Gt perform numeric comparisons (requires feature gate TaintTolerationComparisonOperators). 1246 1247 type: string 1247 1248 tolerationSeconds: 1248 1249 description: |-
+11 -6
internal/controller/spindleset_controller.go
··· 42 42 "tangled.org/evan.jarrett.net/loom/internal/jobbuilder" 43 43 ) 44 44 45 + const ( 46 + phaseSucceeded = "Succeeded" 47 + phaseFailed = "Failed" 48 + ) 49 + 45 50 // SpindleSetReconciler reconciles a SpindleSet object 46 51 type SpindleSetReconciler struct { 47 52 client.Client ··· 129 134 } 130 135 131 136 // If SpindleSet is terminal, don't create new jobs — check for TTL cleanup 132 - if spindleSet.Status.Phase == "Succeeded" || spindleSet.Status.Phase == "Failed" { 137 + if spindleSet.Status.Phase == phaseSucceeded || spindleSet.Status.Phase == phaseFailed { 133 138 age := time.Since(spindleSet.CreationTimestamp.Time) 134 139 const spindleSetMaxLifetime = 4 * time.Hour 135 140 if age > spindleSetMaxLifetime { ··· 316 321 317 322 // Compute Phase with latch semantics: once terminal, never revert. 318 323 // This prevents job recreation after TTL deletes completed Jobs. 319 - if spindleSet.Status.Phase != "Succeeded" && spindleSet.Status.Phase != "Failed" { 324 + if spindleSet.Status.Phase != phaseSucceeded && spindleSet.Status.Phase != phaseFailed { 320 325 expectedWorkflows := int32(len(spindleSet.Spec.PipelineRun.Workflows)) 321 326 totalTerminal := completed + failed 322 327 323 328 var newPhase string 324 329 switch { 325 330 case expectedWorkflows > 0 && totalTerminal >= expectedWorkflows && failed > 0: 326 - newPhase = "Failed" 331 + newPhase = phaseFailed 327 332 case expectedWorkflows > 0 && totalTerminal >= expectedWorkflows: 328 - newPhase = "Succeeded" 333 + newPhase = phaseSucceeded 329 334 case running > 0: 330 335 newPhase = "Running" 331 336 default: ··· 405 410 // ensurePipelineJobs ensures Jobs are created for all workflows in a pipeline run 406 411 func (r *SpindleSetReconciler) ensurePipelineJobs(ctx context.Context, spindleSet *loomv1alpha1.SpindleSet) error { 407 412 // Don't recreate jobs for completed pipelines 408 - if spindleSet.Status.Phase == "Succeeded" || spindleSet.Status.Phase == "Failed" { 413 + if spindleSet.Status.Phase == phaseSucceeded || spindleSet.Status.Phase == phaseFailed { 409 414 return nil 410 415 } 411 416 ··· 465 470 466 471 // List nodes for profile selection (to validate nodeSelector labels exist) 467 472 var nodeList corev1.NodeList 468 - if err := r.Client.List(ctx, &nodeList); err != nil { 473 + if err := r.List(ctx, &nodeList); err != nil { 469 474 return fmt.Errorf("failed to list nodes: %w", err) 470 475 } 471 476
+14 -14
internal/engine/kubernetes_engine.go
··· 11 11 "sync" 12 12 "time" 13 13 14 - "github.com/cyphar/filepath-securejoin" 14 + securejoin "github.com/cyphar/filepath-securejoin" 15 15 "gopkg.in/yaml.v3" 16 16 batchv1 "k8s.io/api/batch/v1" 17 17 corev1 "k8s.io/api/core/v1" ··· 59 59 } 60 60 61 61 // NewKubernetesEngine creates a new Kubernetes-based spindle engine. 62 - func NewKubernetesEngine(client client.Client, config *rest.Config, namespace string, template loomv1alpha1.SpindleTemplate, vault secrets.Manager) *KubernetesEngine { 62 + func NewKubernetesEngine(k8sClient client.Client, config *rest.Config, namespace string, template loomv1alpha1.SpindleTemplate, vault secrets.Manager) *KubernetesEngine { 63 63 return &KubernetesEngine{ 64 - client: client, 64 + client: k8sClient, 65 65 config: config, 66 66 namespace: namespace, 67 67 template: template, ··· 164 164 165 165 // SetupWorkflow creates a SpindleSet CR for the workflow. 166 166 func (e *KubernetesEngine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error { 167 - logger := log.FromContext(ctx).WithValues("workflow", wid.Name, "pipeline", wid.PipelineId.Rkey) 167 + logger := log.FromContext(ctx).WithValues("workflow", wid.Name, "pipeline", wid.Rkey) 168 168 169 169 // Extract pre-computed workflow data 170 170 data, ok := wf.Data.(*kubernetesWorkflowData) ··· 216 216 sanitizedWorkflowName = strings.TrimSuffix(sanitizedWorkflowName, ".yml") 217 217 sanitizedWorkflowName = strings.ReplaceAll(sanitizedWorkflowName, ".", "-") 218 218 219 - spindleSetName := fmt.Sprintf("spindle-%s-%s", sanitizedWorkflowName, wid.PipelineId.Rkey) 219 + spindleSetName := fmt.Sprintf("spindle-%s-%s", sanitizedWorkflowName, wid.Rkey) 220 220 if len(spindleSetName) > 63 { 221 221 // Kubernetes names must be 63 chars or less 222 222 spindleSetName = spindleSetName[:63] ··· 226 226 // Knot is extracted from the pipeline ID provided by the framework 227 227 skipClone := len(data.CloneStep.Commands()) == 0 228 228 pipelineRun := &loomv1alpha1.PipelineRunSpec{ 229 - PipelineID: wid.PipelineId.Rkey, 229 + PipelineID: wid.Rkey, 230 230 SkipClone: skipClone, 231 231 Secrets: repoSecrets, 232 232 Workflows: []loomv1alpha1.WorkflowSpec{data.Spec}, ··· 244 244 Namespace: e.namespace, 245 245 Labels: map[string]string{ 246 246 "loom.j5t.io/component": "spindle", 247 - "loom.j5t.io/pipeline-id": wid.PipelineId.Rkey, 247 + "loom.j5t.io/pipeline-id": wid.Rkey, 248 248 "loom.j5t.io/workflow": wf.Name, 249 249 }, 250 250 }, ··· 286 286 if err := e.client.List(ctx, spindleSetList, 287 287 client.InNamespace(e.namespace), 288 288 client.MatchingLabels{ 289 - "loom.j5t.io/pipeline-id": wid.PipelineId.Rkey, 289 + "loom.j5t.io/pipeline-id": wid.Rkey, 290 290 "loom.j5t.io/workflow": wid.Name, 291 291 }); err != nil { 292 292 return nil, fmt.Errorf("failed to query SpindleSet: %w", err) ··· 304 304 305 305 // DestroyWorkflow cleans up the SpindleSet after completion. 306 306 func (e *KubernetesEngine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 307 - logger := log.FromContext(ctx).WithValues("workflow", wid.Name, "pipeline", wid.PipelineId.Rkey) 307 + logger := log.FromContext(ctx).WithValues("workflow", wid.Name, "pipeline", wid.Rkey) 308 308 309 309 spindleSet, err := e.getSpindleSet(ctx, wid) 310 310 if err != nil { ··· 352 352 // RunStep streams logs for the specific step and waits for that step to complete. 353 353 // For Kubernetes engine, all steps run in a single Job, but we stream logs incrementally 354 354 // as each step executes. Each RunStep call blocks until that step's "end" control event is received. 355 - func (e *KubernetesEngine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error { 356 - logger := log.FromContext(ctx).WithValues("workflow", wid.Name, "pipeline", wid.PipelineId.Rkey, "step", idx) 355 + func (e *KubernetesEngine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, wfSecrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error { 356 + logger := log.FromContext(ctx).WithValues("workflow", wid.Name, "pipeline", wid.Rkey, "step", idx) 357 357 358 358 // Query for the Job created by SpindleSetReconciler (only on first step) 359 359 var job *batchv1.Job ··· 379 379 client.MatchingLabels{ 380 380 "loom.j5t.io/spindleset": spindleSet.Name, 381 381 "loom.j5t.io/workflow": w.Name, 382 - "loom.j5t.io/pipeline-id": wid.PipelineId.Rkey, 382 + "loom.j5t.io/pipeline-id": wid.Rkey, 383 383 }) 384 384 if err != nil { 385 385 return fmt.Errorf("failed to list jobs: %w", err) ··· 437 437 } 438 438 439 439 // Create new stream 440 - logger := log.FromContext(ctx).WithValues("workflow", wid.Name, "pipeline", wid.PipelineId.Rkey) 440 + logger := log.FromContext(ctx).WithValues("workflow", wid.Name, "pipeline", wid.Rkey) 441 441 442 442 // Create kubernetes clientset for log streaming 443 443 clientset, err := kubernetes.NewForConfig(e.config) ··· 591 591 logLine.Stream = "stdout" // Default to stdout 592 592 } 593 593 dataWriter := wfLogger.DataWriter(logLine.StepId, logLine.Stream) 594 - dataWriter.Write([]byte(logLine.Content + "\n")) 594 + _, _ = dataWriter.Write([]byte(logLine.Content + "\n")) 595 595 } 596 596 } 597 597
+1 -1
internal/jobbuilder/job_template.go
··· 287 287 // - Engine-specific vars (PATH, TANGLED_ARCHITECTURE, HOME) set in InitWorkflow 288 288 // - Pipeline-level vars (TANGLED_REPO_*, TANGLED_REF, CI, etc.) injected by framework 289 289 func buildEnvironmentVariables(config WorkflowConfig) []corev1.EnvVar { 290 - var env []corev1.EnvVar 290 + env := make([]corev1.EnvVar, 0, len(config.WorkflowSpec.Environment)) 291 291 for key, value := range config.WorkflowSpec.Environment { 292 292 env = append(env, corev1.EnvVar{ 293 293 Name: key,