Kubernetes Operator for Tangled Spindles
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

restore back logs api

+281 -187
+18 -45
cmd/runner/main.go
··· 35 35 stream grpc.BidiStreamingClient[pb.ConnectRequest, pb.ConnectResponse] 36 36 } 37 37 38 - func (e *grpcEmitter) sendStepControl(stepID int, status string, exitCode int) { 39 - if e.stream != nil { 40 - _ = e.stream.Send(&pb.ConnectRequest{ 41 - Event: &pb.ConnectRequest_StepControl{ 42 - StepControl: &pb.StepControl{ 43 - StepId: int32(stepID), 44 - Status: status, 45 - ExitCode: int32(exitCode), 46 - }, 47 - }, 48 - }) 49 - } 50 - } 51 - 52 - func (e *grpcEmitter) sendLogLine(stepID int, streamName, content string) { 53 - if e.stream != nil { 54 - _ = e.stream.Send(&pb.ConnectRequest{ 55 - Event: &pb.ConnectRequest_LogLine{ 56 - LogLine: &pb.LogLine{ 57 - StepId: int32(stepID), 58 - Stream: streamName, 59 - Content: content, 60 - }, 61 - }, 62 - }) 63 - } 64 - } 65 - 66 38 func main() { 67 39 // Handle --install flag for self-copying (used by init container in distroless image) 68 40 if len(os.Args) >= 3 && os.Args[1] == "--install" { ··· 131 103 return fmt.Errorf("failed to parse workflow spec: %w", err) 132 104 } 133 105 134 - // Connect to operator via gRPC 106 + // Connect to operator via gRPC (optional — only needed for artifact transfer). 107 + // Single-arch workflows work without gRPC; all log/control events go to stdout. 135 108 emitter, cleanup, err := connectToOperator(workflow) 136 109 if err != nil { 137 - // gRPC connection failure is fatal — the operator won't see our events 138 - return fmt.Errorf("failed to connect to operator: %w", err) 110 + needsGRPC := os.Getenv("LOOM_MATRIX_LEG") == "true" || os.Getenv("LOOM_FINAL") == "true" 111 + if needsGRPC { 112 + return fmt.Errorf("gRPC required for artifact transfer but failed to connect: %w", err) 113 + } 114 + fmt.Fprintf(os.Stderr, "WARNING: gRPC connection failed (artifacts unavailable): %v\n", err) 115 + emitter = &grpcEmitter{} 116 + cleanup = func() {} 139 117 } 140 118 defer cleanup() 141 119 ··· 169 147 // Execute each step 170 148 ctx := context.Background() 171 149 for i, step := range workflow.Steps { 172 - if err := executeStep(ctx, i, step, emitter); err != nil { 150 + if err := executeStep(ctx, i, step); err != nil { 173 151 return fmt.Errorf("step %d (%s) failed: %w", i, step.Name, err) 174 152 } 175 153 } ··· 228 206 return emitter, cleanup, nil 229 207 } 230 208 231 - func executeStep(ctx context.Context, stepID int, step loomv1alpha1.WorkflowStep, emitter *grpcEmitter) error { 232 - // Emit step start — gRPC + stdout 233 - emitter.sendStepControl(stepID, "start", 0) 209 + func executeStep(ctx context.Context, stepID int, step loomv1alpha1.WorkflowStep) error { 210 + // Emit step start to stdout (consumed by controller via pod logs API) 234 211 emitStdoutControl(stepID, &simpleStep{name: step.Name, command: step.Command}, models.StepStatusStart) 235 212 236 213 // Set step-specific environment variables ··· 259 236 } 260 237 261 238 if err := cmd.Start(); err != nil { 262 - emitter.sendStepControl(stepID, "end", 1) 239 + emitStdoutControlWithCode(stepID, &simpleStep{name: step.Name, command: step.Command}, models.StepStatusEnd, 1) 263 240 return fmt.Errorf("failed to start command: %w", err) 264 241 } 265 242 266 243 // Stream stdout and stderr concurrently 267 244 done := make(chan error, 2) 268 - go streamOutput(stdout, stepID, "stdout", emitter, done) 269 - go streamOutput(stderr, stepID, "stderr", emitter, done) 245 + go streamOutput(stdout, stepID, "stdout", done) 246 + go streamOutput(stderr, stepID, "stderr", done) 270 247 271 248 for i := 0; i < 2; i++ { 272 249 if err := <-done; err != nil { ··· 285 262 } 286 263 } 287 264 288 - // Emit step end — gRPC + stdout 289 - emitter.sendStepControl(stepID, "end", exitCode) 265 + // Emit step end to stdout (consumed by controller via pod logs API) 290 266 emitStdoutControlWithCode(stepID, &simpleStep{name: step.Name, command: step.Command}, models.StepStatusEnd, exitCode) 291 267 292 268 if exitCode != 0 { ··· 296 272 return nil 297 273 } 298 274 299 - func streamOutput(reader io.Reader, stepID int, streamName string, emitter *grpcEmitter, done chan<- error) { 275 + func streamOutput(reader io.Reader, stepID int, streamName string, done chan<- error) { 300 276 scanner := bufio.NewScanner(reader) 301 277 buf := make([]byte, 0, 64*1024) 302 278 scanner.Buffer(buf, 1024*1024) 303 279 304 280 for scanner.Scan() { 305 281 line := scanner.Text() 306 - // Send over gRPC (primary channel) 307 - emitter.sendLogLine(stepID, streamName, line) 308 - // Also emit to stdout for kubectl logs 309 282 emitStdoutData(stepID, streamName, line) 310 283 } 311 284 312 285 done <- scanner.Err() 313 286 } 314 287 315 - // Stdout emitters — for kubectl logs debugging. Not consumed by the operator. 288 + // Stdout emitters — log content is consumed by the operator via the k8s pod logs API. 316 289 317 290 func emitStdoutControl(stepID int, step models.Step, status models.StepStatus) { 318 291 logLine := models.NewControlLogLine(stepID, step, status)
+259 -107
internal/engine/kubernetes_engine.go
··· 1 1 package engine 2 2 3 3 import ( 4 + "bufio" 4 5 "context" 6 + "encoding/json" 5 7 "fmt" 8 + "io" 6 9 "maps" 7 10 "strings" 11 + "sync" 8 12 "time" 9 13 10 14 securejoin "github.com/cyphar/filepath-securejoin" 11 15 "gopkg.in/yaml.v3" 16 + corev1 "k8s.io/api/core/v1" 12 17 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 18 + "k8s.io/client-go/kubernetes" 13 19 "k8s.io/client-go/rest" 14 20 "sigs.k8s.io/controller-runtime/pkg/client" 15 21 "sigs.k8s.io/controller-runtime/pkg/log" ··· 49 55 return 1000 + (numLegs+1)*100 + stepIdx 50 56 } 51 57 58 + // workflowLogStream holds the state for streaming logs from a workflow's pod. 59 + type workflowLogStream struct { 60 + scanner *bufio.Scanner 61 + stream io.ReadCloser 62 + pod *corev1.Pod 63 + } 64 + 65 + // extendedLogLine extends models.LogLine with exit code for error reporting. 66 + type extendedLogLine struct { 67 + models.LogLine 68 + ExitCode int `json:"exit_code,omitempty"` 69 + } 70 + 52 71 // KubernetesEngine implements the spindle Engine interface for Kubernetes Jobs. 53 72 type KubernetesEngine struct { 54 73 client client.Client ··· 61 80 62 81 // Track created SpindleSets for cleanup 63 82 spindleSets map[string]*loomv1alpha1.SpindleSet 83 + 84 + // Active log streams per workflow - persist across RunStep calls 85 + logStreams map[string]*workflowLogStream 86 + streamMutex sync.RWMutex 64 87 } 65 88 66 89 // NewKubernetesEngine creates a new Kubernetes-based spindle engine. ··· 74 97 hub: hub, 75 98 artifacts: artifacts, 76 99 spindleSets: make(map[string]*loomv1alpha1.SpindleSet), 100 + logStreams: make(map[string]*workflowLogStream), 77 101 } 78 102 } 79 103 ··· 525 549 // Remove from tracking map 526 550 delete(e.spindleSets, wid.String()) 527 551 552 + // Close any open log streams for this workflow 553 + e.closeLogStream(wid.Name + "/" + wid.Rkey) 554 + 528 555 // Clean up artifacts for this pipeline 529 556 if e.artifacts != nil { 530 557 if err := e.artifacts.Cleanup(wid.PipelineId.AtUri().String()); err != nil { ··· 536 563 return nil 537 564 } 538 565 539 - // RunStep waits for step completion events from the runner via the gRPC hub. 566 + // findPodForWorkflow locates the runner pod for a workflow by its labels. 567 + // Polls until a pod exists and is Running, Succeeded, or Failed. 568 + func (e *KubernetesEngine) findPodForWorkflow(ctx context.Context, workflowName, pipelineID string) (*corev1.Pod, error) { 569 + deadline := time.Now().Add(7 * time.Minute) 570 + for { 571 + if time.Now().After(deadline) { 572 + return nil, fmt.Errorf("timeout waiting for pod for workflow %s", workflowName) 573 + } 574 + 575 + pods := &corev1.PodList{} 576 + if err := e.client.List(ctx, pods, 577 + client.InNamespace(e.namespace), 578 + client.MatchingLabels{ 579 + "loom.j5t.io/pipeline-id": pipelineID, 580 + "loom.j5t.io/workflow": workflowName, 581 + }, 582 + ); err != nil { 583 + return nil, fmt.Errorf("failed to list pods: %w", err) 584 + } 585 + 586 + for i := range pods.Items { 587 + pod := &pods.Items[i] 588 + switch pod.Status.Phase { 589 + case corev1.PodRunning, corev1.PodSucceeded, corev1.PodFailed: 590 + return pod, nil 591 + } 592 + } 593 + 594 + select { 595 + case <-ctx.Done(): 596 + return nil, ctx.Err() 597 + case <-time.After(2 * time.Second): 598 + } 599 + } 600 + } 601 + 602 + // openLogStream opens a Kubernetes pod log stream for the runner container. 603 + func (e *KubernetesEngine) openLogStream(ctx context.Context, pod *corev1.Pod) (*workflowLogStream, error) { 604 + clientset, err := kubernetes.NewForConfig(e.config) 605 + if err != nil { 606 + return nil, fmt.Errorf("failed to create kubernetes clientset: %w", err) 607 + } 608 + 609 + // Only use Follow mode for running pods. For completed pods, we need to read 610 + // existing logs (Follow:true only streams NEW logs after connection). 611 + shouldFollow := pod.Status.Phase == corev1.PodRunning 612 + 613 + req := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{ 614 + Container: "runner", 615 + Follow: shouldFollow, 616 + }) 617 + 618 + logStream, err := req.Stream(ctx) 619 + if err != nil { 620 + return nil, fmt.Errorf("failed to open log stream: %w", err) 621 + } 622 + 623 + scanner := bufio.NewScanner(logStream) 624 + buf := make([]byte, 0, 64*1024) 625 + scanner.Buffer(buf, 1024*1024) 626 + 627 + return &workflowLogStream{ 628 + scanner: scanner, 629 + stream: logStream, 630 + pod: pod, 631 + }, nil 632 + } 633 + 634 + // getOrCreateLogStream returns a cached log stream or creates a new one. 635 + func (e *KubernetesEngine) getOrCreateLogStream(ctx context.Context, workflowName, pipelineID string) (*workflowLogStream, error) { 636 + key := workflowName + "/" + pipelineID 637 + 638 + e.streamMutex.RLock() 639 + stream, exists := e.logStreams[key] 640 + e.streamMutex.RUnlock() 641 + if exists { 642 + return stream, nil 643 + } 644 + 645 + logger := log.FromContext(ctx) 646 + 647 + pod, err := e.findPodForWorkflow(ctx, workflowName, pipelineID) 648 + if err != nil { 649 + return nil, err 650 + } 651 + logger.Info("Found pod for log streaming", "pod", pod.Name, "phase", pod.Status.Phase) 652 + 653 + stream, err = e.openLogStream(ctx, pod) 654 + if err != nil { 655 + return nil, err 656 + } 657 + 658 + e.streamMutex.Lock() 659 + e.logStreams[key] = stream 660 + e.streamMutex.Unlock() 661 + 662 + return stream, nil 663 + } 664 + 665 + // closeLogStream closes and removes a log stream from the cache. 666 + func (e *KubernetesEngine) closeLogStream(key string) { 667 + e.streamMutex.Lock() 668 + defer e.streamMutex.Unlock() 669 + 670 + if stream, exists := e.logStreams[key]; exists { 671 + stream.stream.Close() 672 + delete(e.logStreams, key) 673 + } 674 + } 675 + 676 + // readUntilStepEnd reads from the pod log stream until the end control event 677 + // for the specified step. Data lines are forwarded to wfLogger. The stepIDMapper 678 + // translates runner step IDs to log step IDs (identity for single-arch, 679 + // matrixLegLogStepID/finalLogStepID for multi-arch). 680 + func (e *KubernetesEngine) readUntilStepEnd(ctx context.Context, stream *workflowLogStream, runnerStepID int, stepIDMapper func(int) int, wfLogger models.WorkflowLogger) error { 681 + scanner := stream.scanner 682 + 683 + for scanner.Scan() { 684 + line := scanner.Text() 685 + 686 + var logLine extendedLogLine 687 + if err := json.Unmarshal([]byte(line), &logLine); err != nil { 688 + continue 689 + } 690 + 691 + // Only process events for the current step 692 + if logLine.StepId != runnerStepID { 693 + continue 694 + } 695 + 696 + mappedID := stepIDMapper(runnerStepID) 697 + 698 + switch logLine.Kind { 699 + case models.LogKindControl: 700 + if logLine.StepStatus == models.StepStatusEnd { 701 + if logLine.ExitCode != 0 { 702 + return fmt.Errorf("step %d failed with exit code %d", runnerStepID, logLine.ExitCode) 703 + } 704 + return nil 705 + } 706 + 707 + case models.LogKindData: 708 + if wfLogger == nil { 709 + continue 710 + } 711 + logStream := logLine.Stream 712 + if logStream == "" { 713 + logStream = "stdout" 714 + } 715 + dataWriter := wfLogger.DataWriter(mappedID, logStream) 716 + _, _ = dataWriter.Write([]byte(logLine.Content + "\n")) 717 + } 718 + } 719 + 720 + if err := scanner.Err(); err != nil { 721 + return fmt.Errorf("error reading logs: %w", err) 722 + } 723 + 724 + // Scanner ended without seeing step end event — check pod status 725 + currentPod := &corev1.Pod{} 726 + if err := e.client.Get(ctx, client.ObjectKey{Namespace: stream.pod.Namespace, Name: stream.pod.Name}, currentPod); err == nil { 727 + if currentPod.Status.Phase == corev1.PodSucceeded { 728 + return nil 729 + } 730 + if currentPod.Status.Phase == corev1.PodFailed { 731 + return fmt.Errorf("pod failed before step %d completed", runnerStepID) 732 + } 733 + } 734 + 735 + return fmt.Errorf("log stream ended before step %d completed", runnerStepID) 736 + } 737 + 738 + // RunStep streams logs from the runner pod and waits for step completion. 540 739 // For single-arch workflows, blocks until the step's "end" control event. 541 740 // For multi-arch workflows: 542 741 // - idx 0: waits for ALL matrix leg runners to complete all their steps ··· 550 749 } 551 750 552 751 if data.IsMultiArch { 553 - return e.runMultiArchStep(ctx, wid, data, idx, wfLogger) 752 + return e.runMultiArchStep(ctx, wid, w, data, idx, wfLogger) 554 753 } 555 754 556 - // Single-arch: wait for one runner. 557 - // PipelineID must match what the runner sends — the framework injects 558 - // TANGLED_PIPELINE_ID as the full AT URI (see spindle/models.PipelineEnvVars), 559 - // and the runner uses that value when registering with the hub. 560 - key := loomgrpc.RunnerKey{ 561 - PipelineID: wid.PipelineId.AtUri().String(), 562 - WorkflowName: wid.Name, 563 - Architecture: data.Spec.Architecture, 755 + // Single-arch: get or create the pod log stream 756 + logStream, err := e.getOrCreateLogStream(ctx, wid.Name, wid.Rkey) 757 + if err != nil { 758 + return fmt.Errorf("failed to get log stream: %w", err) 564 759 } 565 760 566 - if idx == 0 { 567 - logger.Info("waiting for runner to connect", "key", key.String()) 568 - select { 569 - case <-e.hub.WaitForRunner(key): 570 - logger.Info("runner connected", "key", key.String()) 571 - case <-ctx.Done(): 572 - return fmt.Errorf("context canceled while waiting for runner: %w", ctx.Err()) 573 - case <-time.After(10 * time.Minute): 574 - return fmt.Errorf("timeout waiting for runner to connect") 575 - } 761 + // Read from stream until this step's end event 762 + logger.Info("Reading logs for step", "stepID", idx) 763 + if err := e.readUntilStepEnd(ctx, logStream, idx, func(id int) int { return id }, wfLogger); err != nil { 764 + e.closeLogStream(wid.Name + "/" + wid.Rkey) 765 + return fmt.Errorf("failed to read logs for step %d: %w", idx, err) 576 766 } 767 + logger.Info("Step completed", "stepID", idx) 577 768 578 - return e.waitForRunnerStep(ctx, key, idx, idx, wfLogger) 769 + // Close stream after last step 770 + if idx == len(w.Steps)-1 { 771 + e.closeLogStream(wid.Name + "/" + wid.Rkey) 772 + } 773 + 774 + return nil 579 775 } 580 776 581 777 // runMultiArchStep handles RunStep for multi-arch workflows. 582 - func (e *KubernetesEngine) runMultiArchStep(ctx context.Context, wid models.WorkflowId, data *kubernetesWorkflowData, idx int, wfLogger models.WorkflowLogger) error { 778 + func (e *KubernetesEngine) runMultiArchStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, data *kubernetesWorkflowData, idx int, wfLogger models.WorkflowLogger) error { 583 779 logger := log.FromContext(ctx).WithValues("workflow", wid.Name, "pipeline", wid.Rkey, "step", idx) 584 780 585 781 if idx == 0 { ··· 595 791 for legIdx, leg := range data.MatrixLegs { 596 792 go func() { 597 793 legName := fmt.Sprintf("%s-%s", data.Spec.Name, leg.Architecture) 598 - key := loomgrpc.RunnerKey{ 599 - PipelineID: wid.PipelineId.AtUri().String(), 600 - WorkflowName: legName, 601 - Architecture: leg.Architecture, 602 - } 603 794 604 - // Wait for this leg's runner to connect 605 - select { 606 - case <-e.hub.WaitForRunner(key): 607 - logger.Info("matrix leg runner connected", "key", key.String()) 608 - case <-ctx.Done(): 609 - results <- legResult{leg: leg, err: ctx.Err()} 610 - return 611 - case <-time.After(10 * time.Minute): 612 - results <- legResult{leg: leg, err: fmt.Errorf("timeout waiting for runner %s", key.String())} 795 + // Open pod log stream for this leg 796 + logStream, err := e.getOrCreateLogStream(ctx, legName, wid.Rkey) 797 + if err != nil { 798 + results <- legResult{leg: leg, err: fmt.Errorf("failed to get log stream for leg %s: %w", leg.Architecture, err)} 613 799 return 614 800 } 615 801 616 - // Wait for all steps in this leg to complete, emitting per-leg 617 - // control log lines so each architecture's output gets its own 618 - // step section in the rendered log. 802 + mapper := func(id int) int { return matrixLegLogStepID(legIdx, id) } 803 + 804 + // Wait for all steps in this leg to complete 619 805 for stepIdx, userStep := range data.Spec.Steps { 620 806 logStepID := matrixLegLogStepID(legIdx, stepIdx) 621 807 sStep := syntheticStep{ ··· 626 812 if wfLogger != nil { 627 813 _, _ = wfLogger.ControlWriter(logStepID, sStep, models.StepStatusStart).Write([]byte{0}) 628 814 } 629 - err := e.waitForRunnerStep(ctx, key, stepIdx, logStepID, wfLogger) 815 + err := e.readUntilStepEnd(ctx, logStream, stepIdx, mapper, wfLogger) 630 816 if wfLogger != nil { 631 817 _, _ = wfLogger.ControlWriter(logStepID, sStep, models.StepStatusEnd).Write([]byte{0}) 632 818 } ··· 636 822 } 637 823 } 638 824 825 + e.closeLogStream(legName + "/" + wid.Rkey) 639 826 results <- legResult{leg: leg, err: nil} 640 827 }() 641 828 } ··· 661 848 } 662 849 663 850 if idx == 1 && data.FinalSpec != nil { 664 - // Final phase: wait for the final runner 851 + // Final phase 665 852 finalName := fmt.Sprintf("%s-final", data.Spec.Name) 666 - key := loomgrpc.RunnerKey{ 667 - PipelineID: wid.PipelineId.AtUri().String(), 668 - WorkflowName: finalName, 669 - Architecture: data.FinalSpec.Architecture, 670 - } 853 + 854 + // Stream artifacts from matrix legs to the final runner via gRPC 855 + if e.artifacts != nil { 856 + key := loomgrpc.RunnerKey{ 857 + PipelineID: wid.PipelineId.AtUri().String(), 858 + WorkflowName: finalName, 859 + Architecture: data.FinalSpec.Architecture, 860 + } 671 861 672 - logger.Info("waiting for final runner to connect", "key", key.String()) 673 - select { 674 - case <-e.hub.WaitForRunner(key): 675 - logger.Info("final runner connected", "key", key.String()) 676 - case <-ctx.Done(): 677 - return fmt.Errorf("context canceled while waiting for final runner: %w", ctx.Err()) 678 - case <-time.After(10 * time.Minute): 679 - return fmt.Errorf("timeout waiting for final runner to connect") 680 - } 862 + // Wait for the final runner to connect to gRPC (needed for artifact transfer) 863 + logger.Info("waiting for final runner to connect for artifact transfer", "key", key.String()) 864 + select { 865 + case <-e.hub.WaitForRunner(key): 866 + logger.Info("final runner connected", "key", key.String()) 867 + case <-ctx.Done(): 868 + return fmt.Errorf("context canceled while waiting for final runner: %w", ctx.Err()) 869 + case <-time.After(10 * time.Minute): 870 + return fmt.Errorf("timeout waiting for final runner to connect") 871 + } 681 872 682 - // Stream artifacts from matrix legs to the final runner 683 - if e.artifacts != nil { 684 873 rs := e.hub.Get(key) 685 874 if rs != nil { 686 875 logger.Info("streaming artifacts to final runner", "pipeline", wid.Rkey) ··· 690 879 } 691 880 } 692 881 693 - // Wait for all final steps, emitting per-step control log lines so the 694 - // final phase's user steps are visible in the rendered log (the 695 - // framework only emits a single "Final" control entry above us). 882 + // Open pod log stream for final phase 883 + numLegs := len(data.MatrixLegs) 884 + logStream, err := e.getOrCreateLogStream(ctx, finalName, wid.Rkey) 885 + if err != nil { 886 + return fmt.Errorf("failed to get log stream for final phase: %w", err) 887 + } 888 + 889 + mapper := func(id int) int { return finalLogStepID(numLegs, id) } 890 + 891 + // Wait for all final steps 696 892 for stepIdx, finalStep := range data.FinalSpec.Steps { 697 - logStepID := finalLogStepID(len(data.MatrixLegs), stepIdx) 893 + logStepID := finalLogStepID(numLegs, stepIdx) 698 894 sStep := syntheticStep{ 699 895 name: fmt.Sprintf("%s (final)", finalStep.Name), 700 896 command: finalStep.Command, ··· 703 899 if wfLogger != nil { 704 900 _, _ = wfLogger.ControlWriter(logStepID, sStep, models.StepStatusStart).Write([]byte{0}) 705 901 } 706 - err := e.waitForRunnerStep(ctx, key, stepIdx, logStepID, wfLogger) 902 + err := e.readUntilStepEnd(ctx, logStream, stepIdx, mapper, wfLogger) 707 903 if wfLogger != nil { 708 904 _, _ = wfLogger.ControlWriter(logStepID, sStep, models.StepStatusEnd).Write([]byte{0}) 709 905 } ··· 712 908 } 713 909 } 714 910 911 + e.closeLogStream(finalName + "/" + wid.Rkey) 715 912 logger.Info("final steps completed") 716 913 return nil 717 914 } 718 915 719 916 return fmt.Errorf("unexpected step index %d for multi-arch workflow", idx) 720 - } 721 - 722 - // waitForRunnerStep reads events from a runner's gRPC stream until a specific 723 - // step completes. runnerStepID is matched against StepID fields on runner 724 - // events to filter out other steps' events; logStepID is the step id passed to 725 - // wfLogger.DataWriter when forwarding log content. For single-arch workflows 726 - // these are the same; for matrix legs they differ so each leg's logs land in 727 - // a distinct UI step. 728 - func (e *KubernetesEngine) waitForRunnerStep(ctx context.Context, key loomgrpc.RunnerKey, runnerStepID, logStepID int, wfLogger models.WorkflowLogger) error { 729 - rs := e.hub.Get(key) 730 - if rs == nil { 731 - return fmt.Errorf("runner not connected for %s", key.String()) 732 - } 733 - 734 - for { 735 - select { 736 - case evt := <-rs.Steps: 737 - if evt.StepID != runnerStepID { 738 - continue 739 - } 740 - if evt.Status == "end" { 741 - if evt.ExitCode != 0 { 742 - return fmt.Errorf("step %d failed with exit code %d", runnerStepID, evt.ExitCode) 743 - } 744 - return nil 745 - } 746 - 747 - case logEvt := <-rs.Logs: 748 - if logEvt.StepID != runnerStepID || wfLogger == nil { 749 - continue 750 - } 751 - stream := logEvt.Stream 752 - if stream == "" { 753 - stream = "stdout" 754 - } 755 - dataWriter := wfLogger.DataWriter(logStepID, stream) 756 - _, _ = dataWriter.Write([]byte(logEvt.Content + "\n")) 757 - 758 - case <-rs.Done: 759 - return fmt.Errorf("runner disconnected before step %d completed", runnerStepID) 760 - 761 - case <-ctx.Done(): 762 - return fmt.Errorf("context canceled during step %d: %w", runnerStepID, ctx.Err()) 763 - } 764 - } 765 917 } 766 918 767 919 // Ensure KubernetesEngine implements the Engine interface
+1 -20
internal/grpc/hub.go
··· 18 18 return fmt.Sprintf("%s/%s/%s", k.PipelineID, k.WorkflowName, k.Architecture) 19 19 } 20 20 21 - // StepEvent represents a step lifecycle event received from a runner. 22 - type StepEvent struct { 23 - StepID int 24 - Status string // "start" or "end" 25 - ExitCode int 26 - } 27 - 28 - // LogEvent represents a log line received from a runner. 29 - type LogEvent struct { 30 - StepID int 31 - Stream string // "stdout" or "stderr" 32 - Content string 33 - } 34 - 35 21 // ArtifactEvent represents an artifact chunk received from a runner. 36 22 type ArtifactEvent struct { 37 23 Path string ··· 40 26 } 41 27 42 28 // RunnerStream holds the channels for a single runner connection. 43 - // The gRPC server writes to these channels; the engine reads from them. 29 + // Used for artifact transfer between matrix legs and final jobs. 44 30 type RunnerStream struct { 45 - Steps chan StepEvent 46 - Logs chan LogEvent 47 - 48 31 // SendToRunner allows the engine to send messages back to the runner. 49 32 // The gRPC server reads from this channel and sends to the runner. 50 33 SendToRunner chan *pb.ConnectResponse ··· 55 38 56 39 func newRunnerStream() *RunnerStream { 57 40 return &RunnerStream{ 58 - Steps: make(chan StepEvent, 64), 59 - Logs: make(chan LogEvent, 256), 60 41 SendToRunner: make(chan *pb.ConnectResponse, 64), 61 42 Done: make(chan struct{}), 62 43 }
+3 -15
internal/grpc/server.go
··· 71 71 }() 72 72 73 73 // Process the first message's event (if any) 74 - s.processEvent(key, rs, msg) 74 + s.processEvent(key, msg) 75 75 76 76 // Start goroutine to send responses back to the runner 77 77 go func() { ··· 100 100 if err != nil { 101 101 return fmt.Errorf("recv error from %s: %w", key.String(), err) 102 102 } 103 - s.processEvent(key, rs, msg) 103 + s.processEvent(key, msg) 104 104 } 105 105 } 106 106 107 107 // processEvent routes a runner event to the appropriate channel. 108 - func (s *Server) processEvent(key RunnerKey, rs *RunnerStream, msg *pb.ConnectRequest) { 108 + func (s *Server) processEvent(key RunnerKey, msg *pb.ConnectRequest) { 109 109 logger := log.Log.WithName("grpc") 110 110 111 111 switch evt := msg.Event.(type) { 112 - case *pb.ConnectRequest_StepControl: 113 - rs.Steps <- StepEvent{ 114 - StepID: int(evt.StepControl.StepId), 115 - Status: evt.StepControl.Status, 116 - ExitCode: int(evt.StepControl.ExitCode), 117 - } 118 - case *pb.ConnectRequest_LogLine: 119 - rs.Logs <- LogEvent{ 120 - StepID: int(evt.LogLine.StepId), 121 - Stream: evt.LogLine.Stream, 122 - Content: evt.LogLine.Content, 123 - } 124 112 case *pb.ConnectRequest_ArtifactChunk: 125 113 // Persist artifact to disk; final jobs stream it back via StreamToRunner. 126 114 if s.artifacts != nil {