this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: topological sort and Terragrunt apply

+256 -6
+2
compose.yaml
··· 14 14 start_period: 5s 15 15 worker: 16 16 build: ./controller 17 + volumes: 18 + - /var/run/docker.sock:/var/run/docker.sock 17 19 environment: 18 20 TEMPORAL_HOST: temporal:7233 19 21 depends_on:
+1 -1
controller/activities/git.go
··· 107 107 } 108 108 109 109 seen := make(map[string]struct{}) 110 - var modules []string 110 + modules := make([]string, 0) 111 111 112 112 for _, file := range changedFiles { 113 113 // Get the directory of the changed file
+1 -2
controller/activities/git_test.go
··· 1 1 package activities 2 2 3 3 import ( 4 - "context" 5 4 "os" 6 5 "path/filepath" 7 6 "reflect" ··· 133 132 // Helper function to simulate ChangedModules logic without Git 134 133 func getChangedModulesFromFiles(repoPath string, changedFiles []string) []string { 135 134 seen := make(map[string]struct{}) 136 - var modules []string 135 + modules := make([]string, 0) // Initialize as empty slice instead of nil 137 136 138 137 for _, file := range changedFiles { 139 138 // Get the directory of the changed file
+70
controller/activities/graph.go
··· 137 137 b.WriteString("}") 138 138 return b.String() 139 139 } 140 + 141 + // TopologicalSort returns modules grouped by dependency levels. 142 + // Modules in the same level can be executed in parallel. 143 + // Each level must complete before the next level can start. 144 + // An edge from A to B means A depends on B, so B must run before A. 145 + func (g *Graph) TopologicalSort() [][]string { 146 + // Build adjacency list and in-degree count 147 + adjList := make(map[string][]string) 148 + inDegree := make(map[string]int) 149 + 150 + // Initialize all nodes with in-degree 0 151 + for _, node := range g.Nodes { 152 + inDegree[node.Name] = 0 153 + adjList[node.Name] = []string{} 154 + } 155 + 156 + // Build the graph and calculate in-degrees 157 + // Edge from Src to Dest means Src depends on Dest 158 + // So Dest should run before Src 159 + for _, edge := range g.Edges { 160 + adjList[edge.Dest] = append(adjList[edge.Dest], edge.Src) 161 + inDegree[edge.Src]++ 162 + } 163 + 164 + var levels [][]string 165 + remaining := make(map[string]bool) 166 + for _, node := range g.Nodes { 167 + remaining[node.Name] = true 168 + } 169 + 170 + // Process nodes level by level 171 + for len(remaining) > 0 { 172 + var currentLevel []string 173 + 174 + // Find all nodes with in-degree 0 (no dependencies) 175 + for nodeName := range remaining { 176 + if inDegree[nodeName] == 0 { 177 + currentLevel = append(currentLevel, nodeName) 178 + } 179 + } 180 + 181 + // If no nodes found with in-degree 0, there's a cycle 182 + if len(currentLevel) == 0 { 183 + // Return remaining nodes as the final level to handle cycles gracefully 184 + var cycleNodes []string 185 + for nodeName := range remaining { 186 + cycleNodes = append(cycleNodes, nodeName) 187 + } 188 + if len(cycleNodes) > 0 { 189 + levels = append(levels, cycleNodes) 190 + } 191 + break 192 + } 193 + 194 + // Add current level 195 + levels = append(levels, currentLevel) 196 + 197 + // Remove processed nodes and update in-degrees 198 + for _, nodeName := range currentLevel { 199 + delete(remaining, nodeName) 200 + for _, dependent := range adjList[nodeName] { 201 + if remaining[dependent] { 202 + inDegree[dependent]-- 203 + } 204 + } 205 + } 206 + } 207 + 208 + return levels 209 + }
+118
controller/activities/graph_test.go
··· 330 330 t.Errorf("Expected edges %v, but got %v", expectedEdges, prunedEdges) 331 331 } 332 332 } 333 + 334 + func TestTopologicalSort(t *testing.T) { 335 + testCases := []struct { 336 + name string 337 + nodes []string 338 + edges []Edge 339 + expectedLevels [][]string 340 + }{ 341 + { 342 + name: "Simple linear dependency", 343 + nodes: []string{"A", "B", "C"}, 344 + edges: []Edge{ 345 + {Src: "B", Dest: "A"}, 346 + {Src: "C", Dest: "B"}, 347 + }, 348 + expectedLevels: [][]string{ 349 + {"A"}, 350 + {"B"}, 351 + {"C"}, 352 + }, 353 + }, 354 + { 355 + name: "Parallel dependencies", 356 + nodes: []string{"A", "B", "C", "D"}, 357 + edges: []Edge{ 358 + {Src: "C", Dest: "A"}, 359 + {Src: "C", Dest: "B"}, 360 + {Src: "D", Dest: "C"}, 361 + }, 362 + expectedLevels: [][]string{ 363 + {"A", "B"}, 364 + {"C"}, 365 + {"D"}, 366 + }, 367 + }, 368 + { 369 + name: "Complex dependency graph", 370 + nodes: []string{"A", "B", "C", "D", "E", "F"}, 371 + edges: []Edge{ 372 + {Src: "C", Dest: "A"}, 373 + {Src: "C", Dest: "B"}, 374 + {Src: "D", Dest: "C"}, 375 + {Src: "E", Dest: "C"}, 376 + {Src: "F", Dest: "D"}, 377 + {Src: "F", Dest: "E"}, 378 + }, 379 + expectedLevels: [][]string{ 380 + {"A", "B"}, 381 + {"C"}, 382 + {"D", "E"}, 383 + {"F"}, 384 + }, 385 + }, 386 + { 387 + name: "No dependencies", 388 + nodes: []string{"A", "B", "C"}, 389 + edges: []Edge{}, 390 + expectedLevels: [][]string{{"A", "B", "C"}}, 391 + }, 392 + { 393 + name: "Single node", 394 + nodes: []string{"A"}, 395 + edges: []Edge{}, 396 + expectedLevels: [][]string{{"A"}}, 397 + }, 398 + { 399 + name: "Real world example: bootstrap depends on cluster", 400 + nodes: []string{"bootstrap", "cluster"}, 401 + edges: []Edge{ 402 + {Src: "bootstrap", Dest: "cluster"}, 403 + }, 404 + expectedLevels: [][]string{ 405 + {"cluster"}, 406 + {"bootstrap"}, 407 + }, 408 + }, 409 + } 410 + 411 + for _, tc := range testCases { 412 + t.Run(tc.name, func(t *testing.T) { 413 + // Create graph 414 + graph := &Graph{ 415 + Nodes: make([]*Node, len(tc.nodes)), 416 + Edges: make([]*Edge, len(tc.edges)), 417 + } 418 + 419 + for i, nodeName := range tc.nodes { 420 + graph.Nodes[i] = &Node{Name: nodeName} 421 + } 422 + 423 + for i, edge := range tc.edges { 424 + graph.Edges[i] = &Edge{Src: edge.Src, Dest: edge.Dest} 425 + } 426 + 427 + // Get topological sort 428 + levels := graph.TopologicalSort() 429 + 430 + // Verify number of levels 431 + if len(levels) != len(tc.expectedLevels) { 432 + t.Errorf("Expected %d levels, but got %d", len(tc.expectedLevels), len(levels)) 433 + return 434 + } 435 + 436 + // Verify each level 437 + for levelIndex, expectedLevel := range tc.expectedLevels { 438 + actualLevel := levels[levelIndex] 439 + 440 + // Sort both slices for comparison 441 + sort.Strings(expectedLevel) 442 + sort.Strings(actualLevel) 443 + 444 + if !reflect.DeepEqual(actualLevel, expectedLevel) { 445 + t.Errorf("Level %d: expected %v, but got %v", levelIndex, expectedLevel, actualLevel) 446 + } 447 + } 448 + }) 449 + } 450 + }
+22 -2
controller/activities/terragrunt.go
··· 4 4 "context" 5 5 "fmt" 6 6 "os/exec" 7 + "path/filepath" 7 8 8 9 "go.temporal.io/sdk/activity" 9 10 ) ··· 27 28 return graph, nil 28 29 } 29 30 30 - func TerragruntGraphShaking(ctx context.Context, graph *Graph, changedModules []string) (*Graph, error) { 31 + func TerragruntGraphShaking(ctx context.Context, graph *Graph, changedFiles []string) (*Graph, error) { 31 32 logger := activity.GetLogger(ctx) 32 33 33 34 logger.Info("Pruning Terragrunt DAG graph") 34 35 35 - prunedGraph, err := PruneGraph(ctx, graph, changedModules) 36 + prunedGraph, err := PruneGraph(ctx, graph, changedFiles) 36 37 if err != nil { 37 38 return nil, fmt.Errorf("failed to prune dependency graph: %w", err) 38 39 } 39 40 40 41 return prunedGraph, nil 41 42 } 43 + 44 + func TerragruntApply(ctx context.Context, repoPath string, modulePath string, stack string) error { 45 + logger := activity.GetLogger(ctx) 46 + logger.Info("Running terragrunt apply", "module", modulePath, "stack", stack) 47 + 48 + // Construct the full path to the module 49 + fullPath := filepath.Join(repoPath, "infra", stack, modulePath) 50 + 51 + cmd := exec.CommandContext(ctx, "terragrunt", "apply", "--auto-approve") 52 + cmd.Dir = fullPath 53 + 54 + output, err := cmd.CombinedOutput() 55 + if err != nil { 56 + return fmt.Errorf("failed to run terragrunt apply for module %s: %w\nOutput: %s", modulePath, err, string(output)) 57 + } 58 + 59 + logger.Info("Terragrunt apply completed", "module", modulePath, "output", string(output)) 60 + return nil 61 + }
+1
controller/worker/main.go
··· 27 27 w.RegisterActivity(activities.ChangedModules) 28 28 w.RegisterActivity(activities.TerragruntGraph) 29 29 w.RegisterActivity(activities.TerragruntGraphShaking) 30 + w.RegisterActivity(activities.TerragruntApply) 30 31 31 32 err = w.Run(worker.InterruptCh()) 32 33 if err != nil {
+41 -1
controller/workflows/infra.go
··· 1 1 package workflows 2 2 3 3 import ( 4 + "fmt" 5 + "strings" 4 6 "time" 5 7 6 8 "cloudlab/controller/activities" ··· 69 71 return nil, err 70 72 } 71 73 72 - logger.Info("Infra workflow completed.", "nodes", len(prunedGraph.Nodes), "edges", len(prunedGraph.Edges)) 74 + logger.Info("Infra workflow completed graph pruning.", "nodes", len(prunedGraph.Nodes), "edges", len(prunedGraph.Edges)) 75 + 76 + // Get dependency levels for parallel execution 77 + dependencyLevels := prunedGraph.TopologicalSort() 78 + 79 + // Execute terragrunt apply for each level in dependency order 80 + for levelIndex, level := range dependencyLevels { 81 + logger.Info("Starting terragrunt apply for dependency level", "level", levelIndex, "modules", level) 82 + 83 + // Create futures for parallel execution within this level 84 + var futures []workflow.Future 85 + for _, moduleName := range level { 86 + // Create activity options with custom activity ID that includes module name 87 + // Replace slashes with hyphens for ActivityID compatibility 88 + safeModuleName := strings.ReplaceAll(moduleName, "/", "-") 89 + moduleActivityOptions := workflow.ActivityOptions{ 90 + StartToCloseTimeout: 10 * time.Minute, 91 + ActivityID: fmt.Sprintf("TerragruntApply-%s", safeModuleName), 92 + } 93 + moduleCtx := workflow.WithActivityOptions(ctx, moduleActivityOptions) 94 + 95 + future := workflow.ExecuteActivity(moduleCtx, activities.TerragruntApply, path, moduleName, input.Stack) 96 + futures = append(futures, future) 97 + } 98 + 99 + // Wait for all modules in this level to complete 100 + for i, future := range futures { 101 + err := future.Get(ctx, nil) 102 + if err != nil { 103 + logger.Error("TerragruntApply failed", "module", level[i], "level", levelIndex, "Error", err) 104 + return nil, err 105 + } 106 + logger.Info("Module apply completed", "module", level[i], "level", levelIndex) 107 + } 108 + 109 + logger.Info("Completed terragrunt apply for dependency level", "level", levelIndex, "modules", level) 110 + } 111 + 112 + logger.Info("Infra workflow completed successfully.", "totalLevels", len(dependencyLevels), "appliedModules", len(prunedGraph.Nodes)) 73 113 74 114 return prunedGraph, nil 75 115 }