this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(controller): heartbeat for Terragrunt

Khue Doan f5fa01c4 9a497457

+252 -280
+52 -57
controller/activities/activity_test.go
··· 42 42 prunedGraph, err := PruneGraph(ctx, originalGraph, changedFiles) 43 43 44 44 s.NoError(err) 45 - s.Equal(2, prunedGraph.NodeCount()) // database and app (which depends on database) 46 - s.Equal(1, prunedGraph.EdgeCount()) // app -> database 47 - s.True(prunedGraph.Nodes["database"]) 48 - s.True(prunedGraph.Nodes["app"]) 49 - s.False(prunedGraph.Nodes["vpc"]) // vpc should be pruned as it's not changed and no dependents 45 + s.True(prunedGraph.Nodes["database"]) // changed module should be included 46 + s.True(prunedGraph.Nodes["app"]) // dependent should be included 47 + s.False(prunedGraph.Nodes["vpc"]) // non-dependent should be pruned 50 48 } 51 49 52 50 func (s *ActivityTestSuite) TestPruneGraph_EmptyChanges() { ··· 65 63 prunedGraph, err := PruneGraph(ctx, originalGraph, changedFiles) 66 64 67 65 s.NoError(err) 68 - s.Equal(0, prunedGraph.NodeCount()) 69 - s.Equal(0, prunedGraph.EdgeCount()) 66 + s.Empty(prunedGraph.Nodes) // no changes means empty graph 70 67 } 71 68 72 69 func (s *ActivityTestSuite) TestPruneGraph_ComplexDependencies() { ··· 92 89 prunedGraph, err := PruneGraph(ctx, originalGraph, changedFiles) 93 90 94 91 s.NoError(err) 95 - // Should include: database (changed), app (depends on database), monitoring (depends on app) 96 - s.Equal(3, prunedGraph.NodeCount()) 97 - s.True(prunedGraph.Nodes["database"]) 98 - s.True(prunedGraph.Nodes["app"]) 99 - s.True(prunedGraph.Nodes["monitoring"]) 100 - s.False(prunedGraph.Nodes["vpc"]) // not a dependent 101 - s.False(prunedGraph.Nodes["cache"]) // not a dependent 92 + s.True(prunedGraph.Nodes["database"]) // changed module 93 + s.True(prunedGraph.Nodes["app"]) // direct dependent 94 + s.True(prunedGraph.Nodes["monitoring"]) // transitive dependent 95 + s.False(prunedGraph.Nodes["vpc"]) // not a dependent 96 + s.False(prunedGraph.Nodes["cache"]) // not a dependent 102 97 } 103 98 104 99 // Test using TestActivityEnvironment for activities that need proper context 105 100 func (s *ActivityTestSuite) TestTerragruntPrune_WithActivityEnvironment() { 106 - originalGraph := &Graph{ 101 + // Test data 102 + graph := &Graph{ 107 103 Nodes: map[string]bool{ 108 - "vpc": true, 109 - "database": true, 110 - "app": true, 104 + "vpc": true, 105 + "database": true, 106 + "app": true, 107 + "monitoring": true, 111 108 }, 112 109 Edges: map[string][]string{ 113 - "database": {"vpc"}, 114 - "app": {"database"}, 110 + "app": {"database", "vpc"}, 111 + "database": {"vpc"}, 112 + "monitoring": {"app"}, 115 113 }, 116 114 } 117 - changedFiles := []string{"database"} 115 + 116 + changedModules := []string{"database"} 118 117 119 - // Register the activity first 120 - s.env.RegisterActivity(TerragruntPrune) 118 + s.env.RegisterActivity(PruneGraph) 121 119 122 - // Use the activity environment to execute the activity 123 - val, err := s.env.ExecuteActivity(TerragruntPrune, originalGraph, changedFiles) 120 + val, err := s.env.ExecuteActivity(PruneGraph, graph, changedModules) 124 121 s.NoError(err) 125 122 126 123 var result *Graph 127 - err = val.Get(&result) 128 - s.NoError(err) 129 - s.Equal(2, result.NodeCount()) 130 - s.Equal(1, result.EdgeCount()) 124 + s.NoError(val.Get(&result)) 125 + 126 + // Only database (changed) and its dependents (app, monitoring) should be included 127 + // vpc is not included because nothing depends on it 128 + expectedNodes := []string{"database", "app", "monitoring"} 129 + actualNodes := result.GetNodes() 130 + s.ElementsMatch(expectedNodes, actualNodes) 131 + 132 + s.Contains(result.Nodes, "database") 133 + s.Contains(result.Nodes, "app") 134 + s.Contains(result.Nodes, "monitoring") 135 + s.NotContains(result.Nodes, "vpc") 131 136 } 132 137 133 138 func TestActivityTestSuite(t *testing.T) { ··· 142 147 graph, err := NewGraphFromDot(dotString) 143 148 144 149 assert.NoError(t, err) 145 - assert.Equal(t, 0, graph.NodeCount()) 146 - assert.Equal(t, 0, graph.EdgeCount()) 150 + assert.Empty(t, graph.Nodes) 147 151 } 148 152 149 153 func TestNewGraphFromDot_InvalidFormat(t *testing.T) { ··· 152 156 graph, err := NewGraphFromDot(dotString) 153 157 154 158 assert.NoError(t, err) // Should not error, just ignore invalid lines 155 - assert.Equal(t, 0, graph.NodeCount()) 159 + assert.Empty(t, graph.Nodes) 156 160 } 157 161 158 162 func TestGraph_TopologicalSort_CyclicGraph(t *testing.T) { ··· 175 179 // Should handle cycles gracefully by putting remaining nodes in final level 176 180 assert.Greater(t, len(levels), 0) 177 181 178 - // All nodes should be present somewhere 182 + // All nodes should be present somewhere in the levels 179 183 allNodes := make(map[string]bool) 180 184 for _, level := range levels { 181 185 for _, node := range level { 182 186 allNodes[node] = true 183 187 } 184 188 } 185 - assert.Len(t, allNodes, 3) 186 189 assert.True(t, allNodes["a"]) 187 190 assert.True(t, allNodes["b"]) 188 191 assert.True(t, allNodes["c"]) 189 192 } 190 193 191 - func TestExtractQuotedString(t *testing.T) { 192 - tests := []struct { 193 - input string 194 - expected string 195 - }{ 196 - {`"hello"`, "hello"}, 197 - {`"hello world"`, "hello world"}, 198 - {`""`, ""}, 199 - {`hello`, ""}, // No quotes 200 - {`"hello`, ""}, // Missing closing quote 201 - {`hello"`, ""}, // Missing opening quote 202 - {` "hello" `, "hello"}, // With whitespace 203 - } 194 + func TestExtractQuoted(t *testing.T) { 195 + // Test the extractQuoted function indirectly through NewGraphFromDot 196 + dotString := `digraph { 197 + "hello" -> "world"; 198 + "test"; 199 + }` 204 200 205 - for _, test := range tests { 206 - result := extractQuotedString(test.input) 207 - assert.Equal(t, test.expected, result, "Input: %s", test.input) 208 - } 201 + graph, err := NewGraphFromDot(dotString) 202 + 203 + assert.NoError(t, err) 204 + assert.True(t, graph.Nodes["hello"]) 205 + assert.True(t, graph.Nodes["world"]) 206 + assert.True(t, graph.Nodes["test"]) 209 207 } 210 208 211 209 func TestGraph_AddEdge_CreatesNodes(t *testing.T) { ··· 213 211 214 212 graph.AddEdge("a", "b") 215 213 216 - assert.Equal(t, 2, graph.NodeCount()) 217 - assert.Equal(t, 1, graph.EdgeCount()) 218 214 assert.True(t, graph.Nodes["a"]) 219 215 assert.True(t, graph.Nodes["b"]) 220 216 assert.Contains(t, graph.Edges["a"], "b") ··· 261 257 } 262 258 263 259 func TestClone_CheckRepoStatus(t *testing.T) { 264 - // Test checkRepoStatus with non-existent directory 260 + // Test hasCorrectRevision with non-existent directory 265 261 nonExistentPath := "/tmp/non-existent-repo-12345" 266 - exists, hash := checkRepoStatus(context.Background(), nonExistentPath, "main") 267 - assert.False(t, exists) 268 - assert.Empty(t, hash) 262 + hasCorrect := hasCorrectRevision(context.Background(), nonExistentPath, "main") 263 + assert.False(t, hasCorrect) 269 264 }
+34 -102
controller/activities/git.go
··· 12 12 "go.temporal.io/sdk/activity" 13 13 ) 14 14 15 - // generateRepoPath creates a deterministic path for the repository based on URL and revision 16 15 func generateRepoPath(url string, revision string) string { 17 - // Create a hash of the URL and revision for a deterministic path 18 16 hash := sha256.Sum256([]byte(url + ":" + revision)) 19 - hashStr := fmt.Sprintf("%x", hash)[:16] // Use first 16 chars of hash 20 - 21 - // Use /tmp/cloudlab-repos/ as base directory 22 - return filepath.Join("/tmp", "cloudlab-repos", hashStr) 17 + return filepath.Join("/tmp", "cloudlab-repos", fmt.Sprintf("%x", hash)[:16]) 23 18 } 24 19 25 - // checkRepoStatus checks if repository exists and returns the current commit hash 26 - func checkRepoStatus(ctx context.Context, path string, revision string) (exists bool, currentHash string) { 27 - // Check if .git directory exists 28 - gitDir := filepath.Join(path, ".git") 29 - if _, err := os.Stat(gitDir); os.IsNotExist(err) { 30 - return false, "" 31 - } 32 - 33 - // Get current commit hash 34 - cmd := exec.CommandContext(ctx, "git", "rev-parse", "HEAD") 35 - cmd.Dir = path 36 - output, err := cmd.Output() 37 - if err != nil { 38 - return false, "" 20 + func hasCorrectRevision(ctx context.Context, path, revision string) bool { 21 + if _, err := os.Stat(filepath.Join(path, ".git")); os.IsNotExist(err) { 22 + return false 39 23 } 40 24 41 - currentHash = strings.TrimSpace(string(output)) 42 - return true, currentHash 43 - } 44 - 45 - // isCommitAvailable checks if the desired commit/revision is available in the repository 46 - func isCommitAvailable(ctx context.Context, path string, revision string) bool { 47 - // Try to resolve the revision to a commit hash 48 25 cmd := exec.CommandContext(ctx, "git", "rev-parse", revision) 49 26 cmd.Dir = path 50 - _, err := cmd.Output() 51 - return err == nil 27 + return cmd.Run() == nil 52 28 } 53 29 54 30 func Clone(ctx context.Context, url string, revision string) (string, error) { 55 31 logger := activity.GetLogger(ctx) 56 - logger.Info("Ensuring repository availability", "url", url, "revision", revision) 32 + path := generateRepoPath(url, revision) 57 33 58 - // Create deterministic path based on URL and revision to enable reuse 59 - path := generateRepoPath(url, revision) 34 + safeHeartbeat(ctx, "Checking existing repository") 60 35 61 - // Check if repository already exists and has the correct revision 62 - if repoExists, currentHash := checkRepoStatus(ctx, path, revision); repoExists { 63 - if currentHash == revision || isCommitAvailable(ctx, path, revision) { 64 - logger.Info("Repository already available with correct revision", "path", path) 65 - return path, nil 66 - } 67 - logger.Info("Repository exists but wrong revision, will update", "path", path, "current", currentHash, "desired", revision) 36 + if hasCorrectRevision(ctx, path, revision) { 37 + logger.Info("Repository already available", "path", path) 38 + return path, nil 68 39 } 69 40 70 - // Ensure parent directory exists 41 + safeHeartbeat(ctx, "Preparing to clone repository") 42 + 71 43 if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { 72 44 return "", fmt.Errorf("failed to create parent directory: %w", err) 73 45 } 46 + os.RemoveAll(path) 74 47 75 - // Remove existing directory if it exists but is inconsistent 76 - if _, err := os.Stat(path); err == nil { 77 - logger.Info("Removing existing inconsistent repository", "path", path) 78 - if err := os.RemoveAll(path); err != nil { 79 - return "", fmt.Errorf("failed to remove existing repository: %w", err) 80 - } 81 - } 48 + logger.Info("Cloning repository", "url", url, "revision", revision) 49 + safeHeartbeat(ctx, fmt.Sprintf("Starting git clone: %s@%s", url, revision)) 82 50 83 - // Clone the repository 84 - logger.Info("Cloning repository", "url", url, "revision", revision, "path", path) 85 51 cmd := exec.CommandContext(ctx, "git", "clone", "--branch", revision, url, path) 86 52 if err := cmd.Run(); err != nil { 87 - // Clean up the directory if clone fails 88 53 os.RemoveAll(path) 89 54 return "", fmt.Errorf("failed to clone repository: %w", err) 90 55 } 91 56 92 - logger.Info("Successfully cloned repository", "path", path) 57 + safeHeartbeat(ctx, "Clone completed successfully") 93 58 return path, nil 94 59 } 95 60 96 - func changedFiles(ctx context.Context, path string, oldRevision string) ([]string, error) { 97 - logger := activity.GetLogger(ctx) 98 - logger.Info("Getting changed files", "path", path, "oldRevision", oldRevision) 61 + func ChangedModules(ctx context.Context, repoPath string, oldRevision string) ([]string, error) { 62 + safeHeartbeat(ctx, "Analyzing changed files") 99 63 100 64 cmd := exec.CommandContext(ctx, "git", "diff", "--name-only", oldRevision, "HEAD") 101 - cmd.Dir = path 65 + cmd.Dir = repoPath 102 66 output, err := cmd.Output() 103 67 if err != nil { 104 68 return nil, err 105 69 } 106 70 107 - lines := strings.Split(strings.TrimSpace(string(output)), "\n") 108 - var files []string 109 - for _, line := range lines { 110 - line = strings.TrimSpace(line) 111 - if line != "" { 112 - files = append(files, line) 113 - } 114 - } 115 - 116 - return files, nil 117 - } 118 - 119 - func ChangedModules(ctx context.Context, repoPath string, oldRevision string) ([]string, error) { 120 - logger := activity.GetLogger(ctx) 121 - logger.Info("Getting changed modules", "path", repoPath, "oldRevision", oldRevision) 122 - 123 - changedFiles, err := changedFiles(ctx, repoPath, oldRevision) 124 - if err != nil { 125 - return nil, err 126 - } 71 + safeHeartbeat(ctx, "Processing changed files to identify modules") 127 72 128 73 seen := make(map[string]struct{}) 129 74 var modules []string 130 75 131 - for _, file := range changedFiles { 132 - dir := filepath.Dir(file) 133 - 134 - currentDir := dir 135 - for { 136 - terragruntPath := filepath.Join(repoPath, currentDir, "terragrunt.hcl") 137 - if _, err := os.Stat(terragruntPath); err == nil { 138 - modulePath := currentDir 76 + for _, file := range strings.Fields(string(output)) { 77 + if file == "" { 78 + continue 79 + } 139 80 140 - if strings.HasPrefix(modulePath, "infra/") { 141 - parts := strings.Split(filepath.ToSlash(modulePath), "/") 142 - if len(parts) >= 3 && parts[0] == "infra" { 143 - modulePath = strings.Join(parts[2:], "/") 144 - } 145 - } 146 - 147 - if modulePath != "" && modulePath != "." { 148 - modulePath = filepath.ToSlash(modulePath) 149 - 150 - if _, exists := seen[modulePath]; !exists { 151 - modules = append(modules, modulePath) 152 - seen[modulePath] = struct{}{} 81 + for dir := filepath.Dir(file); dir != "." && dir != "/"; dir = filepath.Dir(dir) { 82 + if _, err := os.Stat(filepath.Join(repoPath, dir, "terragrunt.hcl")); err == nil { 83 + // Remove infra/stack prefix to get module path 84 + if parts := strings.Split(filepath.ToSlash(dir), "/"); len(parts) >= 3 && parts[0] == "infra" { 85 + if module := strings.Join(parts[2:], "/"); module != "" { 86 + if _, exists := seen[module]; !exists { 87 + modules = append(modules, module) 88 + seen[module] = struct{}{} 89 + } 153 90 } 154 91 } 155 92 break 156 93 } 157 - 158 - parent := filepath.Dir(currentDir) 159 - if parent == currentDir || parent == "." { 160 - break 161 - } 162 - currentDir = parent 163 94 } 164 95 } 165 96 97 + safeHeartbeat(ctx, fmt.Sprintf("Found %d changed modules", len(modules))) 166 98 return modules, nil 167 99 }
+8 -29
controller/activities/graph.go
··· 6 6 "strings" 7 7 ) 8 8 9 - // Graph represents a simple directed graph with efficient operations. 10 9 type Graph struct { 11 10 Nodes map[string]bool `json:"nodes"` 12 - Edges map[string][]string `json:"edges"` // source -> []destinations 11 + Edges map[string][]string `json:"edges"` 13 12 } 14 13 15 - // NewGraph creates a new empty graph. 16 14 func NewGraph() *Graph { 17 15 return &Graph{ 18 16 Nodes: make(map[string]bool), ··· 20 18 } 21 19 } 22 20 23 - // AddNode adds a node to the graph. 24 21 func (g *Graph) AddNode(name string) { 25 22 g.Nodes[name] = true 26 23 } 27 24 28 - // AddEdge adds a directed edge from src to dest. 29 25 func (g *Graph) AddEdge(src, dest string) { 30 26 g.AddNode(src) 31 27 g.AddNode(dest) 32 28 g.Edges[src] = append(g.Edges[src], dest) 33 29 } 34 30 35 - // GetNodes returns all node names. 36 31 func (g *Graph) GetNodes() []string { 37 32 nodes := make([]string, 0, len(g.Nodes)) 38 33 for name := range g.Nodes { ··· 41 36 return nodes 42 37 } 43 38 44 - // NodeCount returns the number of nodes in the graph. 45 - func (g *Graph) NodeCount() int { 46 - return len(g.Nodes) 47 - } 39 + func PruneGraph(ctx context.Context, graph *Graph, changed []string) (*Graph, error) { 40 + safeHeartbeat(ctx, fmt.Sprintf("Pruning graph with %d nodes, %d changed modules", len(graph.Nodes), len(changed))) 48 41 49 - // EdgeCount returns the number of edges in the graph. 50 - func (g *Graph) EdgeCount() int { 51 - count := 0 52 - for _, dests := range g.Edges { 53 - count += len(dests) 54 - } 55 - return count 56 - } 57 - 58 - // PruneGraph takes a graph and a list of changed nodes, and returns a new graph 59 - // containing only the changed nodes and their dependents. 60 - func PruneGraph(ctx context.Context, graph *Graph, changed []string) (*Graph, error) { 61 - // Build reverse dependency map: target -> dependents 62 42 dependents := make(map[string][]string) 63 43 for src, dests := range graph.Edges { 64 44 for _, dest := range dests { ··· 66 46 } 67 47 } 68 48 69 - // Collect all nodes to keep (changed + all that depend on them) 49 + safeHeartbeat(ctx, "Built reverse dependency map") 50 + 70 51 keep := make(map[string]bool) 71 52 var visit func(string) 72 53 visit = func(node string) { ··· 79 60 } 80 61 } 81 62 82 - // Only visit nodes that actually exist in the graph 83 63 for _, nodeName := range changed { 84 64 if graph.Nodes[nodeName] { 85 65 visit(nodeName) 86 66 } 87 67 } 88 68 89 - // Create pruned graph 69 + safeHeartbeat(ctx, fmt.Sprintf("Identified %d nodes to keep", len(keep))) 70 + 90 71 prunedGraph := NewGraph() 91 72 for node := range keep { 92 73 prunedGraph.AddNode(node) ··· 101 82 } 102 83 } 103 84 85 + safeHeartbeat(ctx, fmt.Sprintf("Created pruned graph with %d nodes", len(prunedGraph.Nodes))) 104 86 return prunedGraph, nil 105 87 } 106 88 107 - // NewGraphFromDot creates a Graph from a DOT string using a simple parser. 108 89 func NewGraphFromDot(dot string) (*Graph, error) { 109 90 graph := NewGraph() 110 91 ··· 115 96 continue 116 97 } 117 98 118 - // Remove trailing semicolon if present 119 99 line = strings.TrimSuffix(line, ";") 120 100 line = strings.TrimSpace(line) 121 101 122 - // Parse edges: "A" -> "B" 123 102 if strings.Contains(line, "->") { 124 103 parts := strings.Split(line, "->") 125 104 if len(parts) == 2 {
+68 -14
controller/activities/terragrunt.go
··· 1 1 package activities 2 2 3 3 import ( 4 + "bufio" 4 5 "context" 5 6 "fmt" 6 7 "os/exec" 7 8 "path/filepath" 9 + "strings" 10 + "time" 8 11 9 12 "go.temporal.io/sdk/activity" 10 13 ) 11 14 12 15 func TerragruntGraph(ctx context.Context, path string) (*Graph, error) { 13 - logger := activity.GetLogger(ctx) 14 - logger.Info("Generating Terragrunt DAG graph", "path", path) 16 + safeHeartbeat(ctx, "Generating terragrunt dependency graph") 15 17 16 18 cmd := exec.CommandContext(ctx, "terragrunt", "dag", "graph") 17 19 cmd.Dir = path ··· 20 22 return nil, fmt.Errorf("failed to run terragrunt dag graph: %w", err) 21 23 } 22 24 23 - graph, err := NewGraphFromDot(string(output)) 24 - if err != nil { 25 - return nil, fmt.Errorf("failed to parse terragrunt graph output: %w", err) 26 - } 27 - 28 - return graph, nil 25 + safeHeartbeat(ctx, "Parsing dependency graph") 26 + return NewGraphFromDot(string(output)) 29 27 } 30 28 31 29 func TerragruntPrune(ctx context.Context, graph *Graph, changedFiles []string) (*Graph, error) { ··· 34 32 35 33 func TerragruntApply(ctx context.Context, repoUrl string, revision string, modulePath string, stack string) error { 36 34 logger := activity.GetLogger(ctx) 37 - logger.Info("Running terragrunt apply", "module", modulePath, "stack", stack, "repo", repoUrl, "revision", revision) 35 + logger.Info("Running terragrunt apply", "module", modulePath, "stack", stack) 38 36 39 - // Ensure repository is available (clone if necessary) 37 + safeHeartbeat(ctx, fmt.Sprintf("Ensuring repository availability for %s", modulePath)) 38 + 40 39 repoPath, err := Clone(ctx, repoUrl, revision) 41 40 if err != nil { 42 41 return fmt.Errorf("failed to ensure repository is available: %w", err) 43 42 } 44 43 45 44 fullPath := filepath.Join(repoPath, "infra", stack, modulePath) 45 + safeHeartbeat(ctx, fmt.Sprintf("Starting terragrunt apply for %s", modulePath)) 46 46 47 47 cmd := exec.CommandContext(ctx, "terragrunt", "apply", "--backend-bootstrap", "--auto-approve") 48 48 cmd.Dir = fullPath 49 49 50 - output, err := cmd.CombinedOutput() 50 + // Create pipes to capture output and send heartbeats 51 + stdout, err := cmd.StdoutPipe() 51 52 if err != nil { 52 - return fmt.Errorf("failed to run terragrunt apply for module %s: %w\nOutput: %s", modulePath, err, string(output)) 53 + return fmt.Errorf("failed to create stdout pipe: %w", err) 54 + } 55 + stderr, err := cmd.StderrPipe() 56 + if err != nil { 57 + return fmt.Errorf("failed to create stderr pipe: %w", err) 53 58 } 54 59 55 - logger.Info("Terragrunt apply completed", "module", modulePath, "output", string(output)) 56 - return nil 60 + if err := cmd.Start(); err != nil { 61 + return fmt.Errorf("failed to start terragrunt apply: %w", err) 62 + } 63 + 64 + // Monitor output and send heartbeats 65 + done := make(chan error, 1) 66 + go func() { 67 + done <- cmd.Wait() 68 + }() 69 + 70 + // Send heartbeats while monitoring output 71 + heartbeatTicker := time.NewTicker(25 * time.Second) // Send heartbeat every 25s (before 30s timeout) 72 + defer heartbeatTicker.Stop() 73 + 74 + var lastOutput string 75 + outputScanner := bufio.NewScanner(stdout) 76 + errorScanner := bufio.NewScanner(stderr) 77 + 78 + for { 79 + select { 80 + case err := <-done: 81 + if err != nil { 82 + return fmt.Errorf("terragrunt apply failed for module %s: %w", modulePath, err) 83 + } 84 + safeHeartbeat(ctx, fmt.Sprintf("Terragrunt apply completed for %s", modulePath)) 85 + return nil 86 + 87 + case <-heartbeatTicker.C: 88 + safeHeartbeat(ctx, fmt.Sprintf("Terragrunt apply in progress for %s - %s", modulePath, lastOutput)) 89 + 90 + default: 91 + // Check for new output 92 + if outputScanner.Scan() { 93 + line := strings.TrimSpace(outputScanner.Text()) 94 + if line != "" { 95 + lastOutput = line 96 + logger.Info("Terragrunt output", "module", modulePath, "output", line) 97 + } 98 + } 99 + if errorScanner.Scan() { 100 + line := strings.TrimSpace(errorScanner.Text()) 101 + if line != "" { 102 + lastOutput = line 103 + logger.Info("Terragrunt error output", "module", modulePath, "error", line) 104 + } 105 + } 106 + 107 + // Small sleep to prevent busy waiting 108 + time.Sleep(100 * time.Millisecond) 109 + } 110 + } 57 111 }
+17
controller/activities/utils.go
··· 1 + package activities 2 + 3 + import ( 4 + "context" 5 + 6 + "go.temporal.io/sdk/activity" 7 + ) 8 + 9 + // safeHeartbeat sends a heartbeat only if we're in an activity context 10 + func safeHeartbeat(ctx context.Context, details string) { 11 + defer func() { 12 + if r := recover(); r != nil { 13 + // Ignore panic - we're not in an activity context 14 + } 15 + }() 16 + activity.RecordHeartbeat(ctx, details) 17 + }
+1 -1
controller/worker/main.go
··· 27 27 w.RegisterActivity(activities.Clone) 28 28 w.RegisterActivity(activities.ChangedModules) 29 29 w.RegisterActivity(activities.TerragruntGraph) 30 - w.RegisterActivity(activities.TerragruntPrune) 30 + w.RegisterActivity(activities.PruneGraph) 31 31 w.RegisterActivity(activities.TerragruntApply) 32 32 33 33 err = w.Run(worker.InterruptCh())
+59 -45
controller/workflows/infra.go
··· 6 6 7 7 "cloudlab/controller/activities" 8 8 9 + "go.temporal.io/sdk/temporal" 9 10 "go.temporal.io/sdk/workflow" 10 11 ) 11 12 ··· 17 18 } 18 19 19 20 func Infra(ctx workflow.Context, input InfraInputs) (*activities.Graph, error) { 20 - ao := workflow.ActivityOptions{ 21 - StartToCloseTimeout: 10 * time.Second, 22 - } 23 - ctx = workflow.WithActivityOptions(ctx, ao) 24 - 25 21 logger := workflow.GetLogger(ctx) 26 22 logger.Info("Infra workflow started", "infra", input) 27 23 24 + // Clone activity: 30s timeout, quick retry on worker failure 25 + cloneCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ 26 + StartToCloseTimeout: 30 * time.Second, 27 + HeartbeatTimeout: 10 * time.Second, 28 + ScheduleToCloseTimeout: 2 * time.Minute, // Allow for retries 29 + RetryPolicy: &temporal.RetryPolicy{ 30 + InitialInterval: 10 * time.Second, // Wait 10s before retry (worker restart time) 31 + BackoffCoefficient: 1.5, 32 + MaximumInterval: 30 * time.Second, 33 + MaximumAttempts: 3, 34 + }, 35 + }) 36 + 28 37 var path string 29 - err := workflow.ExecuteActivity(ctx, activities.Clone, input.Url, input.Revision).Get(ctx, &path) 30 - if err != nil { 31 - logger.Error("Activity failed.", "Error", err) 38 + if err := workflow.ExecuteActivity(cloneCtx, activities.Clone, input.Url, input.Revision).Get(ctx, &path); err != nil { 32 39 return nil, err 33 40 } 34 41 35 - var ( 36 - graph *activities.Graph 37 - changedModules []string 38 - ) 42 + // Graph and analysis activities: moderate timeout 43 + analysisCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ 44 + StartToCloseTimeout: 2 * time.Minute, 45 + HeartbeatTimeout: 30 * time.Second, 46 + ScheduleToCloseTimeout: 5 * time.Minute, 47 + RetryPolicy: &temporal.RetryPolicy{ 48 + InitialInterval: 10 * time.Second, 49 + BackoffCoefficient: 1.5, 50 + MaximumInterval: 1 * time.Minute, 51 + MaximumAttempts: 3, 52 + }, 53 + }) 39 54 40 - graphFuture := workflow.ExecuteActivity(ctx, activities.TerragruntGraph, path+"/infra/"+input.Stack) 41 - changedModulesFuture := workflow.ExecuteActivity(ctx, activities.ChangedModules, path, input.OldRevision) 55 + var graph *activities.Graph 56 + var changedModules []string 57 + 58 + graphFuture := workflow.ExecuteActivity(analysisCtx, activities.TerragruntGraph, path+"/infra/"+input.Stack) 59 + changedFuture := workflow.ExecuteActivity(analysisCtx, activities.ChangedModules, path, input.OldRevision) 42 60 43 - err = graphFuture.Get(ctx, &graph) 44 - if err != nil { 45 - logger.Error("TerragruntGraph failed", "Error", err) 61 + if err := graphFuture.Get(ctx, &graph); err != nil { 46 62 return nil, err 47 63 } 48 - 49 - err = changedModulesFuture.Get(ctx, &changedModules) 50 - if err != nil { 51 - logger.Error("ChangedModules failed", "Error", err) 64 + if err := changedFuture.Get(ctx, &changedModules); err != nil { 52 65 return nil, err 53 66 } 54 67 55 68 var prunedGraph *activities.Graph 56 - err = workflow.ExecuteActivity(ctx, activities.TerragruntPrune, graph, changedModules).Get(ctx, &prunedGraph) 57 - if err != nil { 58 - logger.Error("Activity failed.", "Error", err) 69 + if err := workflow.ExecuteActivity(analysisCtx, activities.PruneGraph, graph, changedModules).Get(ctx, &prunedGraph); err != nil { 59 70 return nil, err 60 71 } 61 72 62 - logger.Info("Infra workflow completed graph pruning.", "nodes", prunedGraph.NodeCount(), "edges", prunedGraph.EdgeCount()) 63 - 64 - dependencyLevels := prunedGraph.TopologicalSort() 73 + logger.Info("Graph pruning completed", "nodes", len(prunedGraph.Nodes)) 65 74 66 - for levelIndex, level := range dependencyLevels { 67 - logger.Info("Starting terragrunt apply for dependency level", "level", levelIndex, "modules", level) 75 + for levelIndex, level := range prunedGraph.TopologicalSort() { 76 + logger.Info("Starting terragrunt apply", "level", levelIndex, "modules", level) 68 77 69 78 var futures []workflow.Future 70 - for _, moduleName := range level { 71 - moduleActivityOptions := workflow.ActivityOptions{ 72 - StartToCloseTimeout: 10 * time.Minute, 73 - Summary: fmt.Sprintf("%s/%s", input.Stack, moduleName), 74 - } 75 - moduleCtx := workflow.WithActivityOptions(ctx, moduleActivityOptions) 76 - 77 - future := workflow.ExecuteActivity(moduleCtx, activities.TerragruntApply, input.Url, input.Revision, moduleName, input.Stack) 78 - futures = append(futures, future) 79 + for _, module := range level { 80 + moduleCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ 81 + StartToCloseTimeout: 30 * time.Minute, 82 + HeartbeatTimeout: 30 * time.Second, 83 + ScheduleToCloseTimeout: 35 * time.Minute, 84 + Summary: fmt.Sprintf("%s/%s", input.Stack, module), 85 + RetryPolicy: &temporal.RetryPolicy{ 86 + InitialInterval: 10 * time.Second, 87 + BackoffCoefficient: 1.2, 88 + MaximumInterval: 2 * time.Minute, 89 + MaximumAttempts: 3, 90 + NonRetryableErrorTypes: []string{ 91 + "TerraformValidationError", 92 + "TerraformPlanError", 93 + }, 94 + }, 95 + }) 96 + futures = append(futures, workflow.ExecuteActivity(moduleCtx, activities.TerragruntApply, input.Url, input.Revision, module, input.Stack)) 79 97 } 80 98 81 99 for i, future := range futures { 82 - err := future.Get(ctx, nil) 83 - if err != nil { 84 - logger.Error("TerragruntApply failed", "module", level[i], "level", levelIndex, "Error", err) 100 + if err := future.Get(ctx, nil); err != nil { 101 + logger.Error("TerragruntApply failed", "module", level[i], "level", levelIndex, "error", err) 85 102 return nil, err 86 103 } 87 104 logger.Info("Module apply completed", "module", level[i], "level", levelIndex) 88 105 } 89 - 90 - logger.Info("Completed terragrunt apply for dependency level", "level", levelIndex, "modules", level) 91 106 } 92 107 93 - logger.Info("Infra workflow completed successfully.", "totalLevels", len(dependencyLevels), "appliedModules", prunedGraph.NodeCount()) 94 - 108 + logger.Info("Infra workflow completed", "levels", len(prunedGraph.TopologicalSort()), "modules", len(prunedGraph.Nodes)) 95 109 return prunedGraph, nil 96 110 }
+13 -32
controller/workflows/infra_test.go
··· 31 31 } 32 32 33 33 func (s *InfraWorkflowTestSuite) TestInfraWorkflow_Success() { 34 - // Mock data 35 34 input := InfraInputs{ 36 35 Url: "https://github.com/example/repo.git", 37 36 Revision: "main", ··· 41 40 repoPath := "/tmp/infra-12345" 42 41 changedModules := []string{"module1", "module2"} 43 42 44 - // Create a sample graph 45 43 graph := &activities.Graph{ 46 44 Nodes: map[string]bool{ 47 45 "module1": true, 48 46 "module2": true, 49 - "module3": true, 50 47 }, 51 48 Edges: map[string][]string{ 52 49 "module1": {"module2"}, // module1 depends on module2 53 50 }, 54 51 } 55 52 56 - // Create pruned graph (only changed modules and dependents) 57 - prunedGraph := &activities.Graph{ 58 - Nodes: map[string]bool{ 59 - "module1": true, 60 - "module2": true, 61 - }, 62 - Edges: map[string][]string{ 63 - "module1": {"module2"}, 64 - }, 65 - } 53 + prunedGraph := graph // Both modules changed 66 54 67 - // Mock activities - use mock.Anything for context parameter 68 55 s.env.OnActivity(activities.Clone, mock.Anything, input.Url, input.Revision).Return(repoPath, nil) 69 56 s.env.OnActivity(activities.TerragruntGraph, mock.Anything, repoPath+"/infra/"+input.Stack).Return(graph, nil) 70 57 s.env.OnActivity(activities.ChangedModules, mock.Anything, repoPath, input.OldRevision).Return(changedModules, nil) 71 - s.env.OnActivity(activities.TerragruntPrune, mock.Anything, graph, changedModules).Return(prunedGraph, nil) 58 + s.env.OnActivity(activities.PruneGraph, mock.Anything, graph, changedModules).Return(prunedGraph, nil) 72 59 s.env.OnActivity(activities.TerragruntApply, mock.Anything, input.Url, input.Revision, "module2", input.Stack).Return(nil) 73 60 s.env.OnActivity(activities.TerragruntApply, mock.Anything, input.Url, input.Revision, "module1", input.Stack).Return(nil) 74 61 75 - // Execute workflow 76 62 s.env.ExecuteWorkflow(Infra, input) 77 63 78 - // Assertions 79 64 s.True(s.env.IsWorkflowCompleted()) 80 65 s.NoError(s.env.GetWorkflowError()) 81 - 82 - var result *activities.Graph 83 - s.NoError(s.env.GetWorkflowResult(&result)) 84 - s.Equal(prunedGraph, result) 85 66 } 86 67 87 68 func (s *InfraWorkflowTestSuite) TestInfraWorkflow_CloneFailure() { ··· 170 151 s.env.OnActivity(activities.Clone, mock.Anything, input.Url, input.Revision).Return(repoPath, nil) 171 152 s.env.OnActivity(activities.TerragruntGraph, mock.Anything, repoPath+"/infra/"+input.Stack).Return(graph, nil) 172 153 s.env.OnActivity(activities.ChangedModules, mock.Anything, repoPath, input.OldRevision).Return(changedModules, nil) 173 - s.env.OnActivity(activities.TerragruntPrune, mock.Anything, graph, changedModules).Return(prunedGraph, nil) 154 + s.env.OnActivity(activities.PruneGraph, mock.Anything, graph, changedModules).Return(prunedGraph, nil) 174 155 s.env.OnActivity(activities.TerragruntApply, mock.Anything, input.Url, input.Revision, "module1", input.Stack).Return( 175 156 errors.New("terragrunt apply failed: resource conflict")) 176 157 ··· 230 211 s.env.OnActivity(activities.Clone, mock.Anything, input.Url, input.Revision).Return(repoPath, nil) 231 212 s.env.OnActivity(activities.TerragruntGraph, mock.Anything, repoPath+"/infra/"+input.Stack).Return(graph, nil) 232 213 s.env.OnActivity(activities.ChangedModules, mock.Anything, repoPath, input.OldRevision).Return(changedModules, nil) 233 - s.env.OnActivity(activities.TerragruntPrune, mock.Anything, graph, changedModules).Return(prunedGraph, nil) 214 + s.env.OnActivity(activities.PruneGraph, mock.Anything, graph, changedModules).Return(prunedGraph, nil) 234 215 235 216 // Mock TerragruntApply calls in dependency order 236 217 // Level 0: vpc ··· 249 230 250 231 var result *activities.Graph 251 232 s.NoError(s.env.GetWorkflowResult(&result)) 252 - s.Equal(4, result.NodeCount()) 253 - s.Equal(3, result.EdgeCount()) 233 + s.True(result.Nodes["vpc"]) 234 + s.True(result.Nodes["database"]) 235 + s.True(result.Nodes["app"]) 236 + s.True(result.Nodes["monitoring"]) 254 237 } 255 238 256 239 func (s *InfraWorkflowTestSuite) TestInfraWorkflow_NoChangedModules() { ··· 282 265 s.env.OnActivity(activities.Clone, mock.Anything, input.Url, input.Revision).Return(repoPath, nil) 283 266 s.env.OnActivity(activities.TerragruntGraph, mock.Anything, repoPath+"/infra/"+input.Stack).Return(graph, nil) 284 267 s.env.OnActivity(activities.ChangedModules, mock.Anything, repoPath, input.OldRevision).Return(changedModules, nil) 285 - s.env.OnActivity(activities.TerragruntPrune, mock.Anything, graph, changedModules).Return(prunedGraph, nil) 268 + s.env.OnActivity(activities.PruneGraph, mock.Anything, graph, changedModules).Return(prunedGraph, nil) 286 269 287 270 // No TerragruntApply calls should be made since no modules to deploy 288 271 ··· 293 276 294 277 var result *activities.Graph 295 278 s.NoError(s.env.GetWorkflowResult(&result)) 296 - s.Equal(0, result.NodeCount()) 297 - s.Equal(0, result.EdgeCount()) 279 + s.Empty(result.Nodes) 298 280 } 299 281 300 282 func (s *InfraWorkflowTestSuite) TestInfraWorkflow_ActivityTimeout() { ··· 345 327 s.env.OnActivity(activities.Clone, mock.Anything, input.Url, input.Revision).Return(repoPath, nil) 346 328 s.env.OnActivity(activities.TerragruntGraph, mock.Anything, repoPath+"/infra/"+input.Stack).Return(graph, nil) 347 329 s.env.OnActivity(activities.ChangedModules, mock.Anything, repoPath, input.OldRevision).Return(changedModules, nil) 348 - s.env.OnActivity(activities.TerragruntPrune, mock.Anything, graph, changedModules).Return(prunedGraph, nil) 330 + s.env.OnActivity(activities.PruneGraph, mock.Anything, graph, changedModules).Return(prunedGraph, nil) 349 331 350 332 // Level 0: module-c 351 333 s.env.OnActivity(activities.TerragruntApply, mock.Anything, input.Url, input.Revision, "module-c", input.Stack).Return(nil) ··· 385 367 s.env.OnActivity(activities.Clone, mock.Anything, input.Url, input.Revision).Return(repoPath, nil) 386 368 s.env.OnActivity(activities.TerragruntGraph, mock.Anything, repoPath+"/infra/"+input.Stack).Return(graph, nil) 387 369 s.env.OnActivity(activities.ChangedModules, mock.Anything, repoPath, input.OldRevision).Return(changedModules, nil) 388 - s.env.OnActivity(activities.TerragruntPrune, mock.Anything, graph, changedModules).Return(prunedGraph, nil) 370 + s.env.OnActivity(activities.PruneGraph, mock.Anything, graph, changedModules).Return(prunedGraph, nil) 389 371 390 372 // Simulate worker failure and retry on different worker 391 373 applyCallCount := 0 ··· 411 393 412 394 var result *activities.Graph 413 395 s.NoError(s.env.GetWorkflowResult(&result)) 414 - s.Equal(1, result.NodeCount()) 415 - s.Equal(0, result.EdgeCount()) 396 + s.True(result.Nodes["module1"]) 416 397 } 417 398 418 399 func TestInfraWorkflowTestSuite(t *testing.T) {