Deployment and lifecycle management for Nix
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

agent/server: improve testing/logging around deployments

+405 -11
+8 -1
.iex-server.exs
··· 1 1 Application.ensure_all_started([:sower]) 2 - 2 + if Code.loaded?(Sower.Accounts.Organization) do 3 + Sower.Accounts.Organization.list() 4 + |> List.first() 5 + |> Map.get(:org_id) 6 + |> Sower.Repo.put_org_id() 7 + else 8 + Application.ensure_all_started([:erlexec, :exsync]) 9 + end 3 10 IEx.configure( 4 11 inspect: [ 5 12 pretty: true,
+59 -2
apps/sower/lib/sower/orchestration.ex
··· 917 917 Task.Supervisor.start_child(Sower.TaskSupervisor, fn -> 918 918 Repo.put_org_id(agent.org_id) 919 919 920 + Logger.info( 921 + msg: "Deployment processing started", 922 + request_id: request_id, 923 + agent_id: agent.id 924 + ) 925 + 920 926 case do_deployment(request_id, subscriptions, opts) do 921 927 {:ok, deployment} -> 928 + Logger.info( 929 + msg: "Deployment broadcast successful", 930 + request_id: request_id, 931 + deployment_sid: deployment.sid, 932 + skipped: deployment.skipped 933 + ) 934 + 922 935 SowerWeb.Endpoint.broadcast( 923 936 "agent:#{agent.sid}", 924 937 "deployment", ··· 926 939 ) 927 940 928 941 {:error, reason} -> 942 + Logger.error( 943 + msg: "Deployment processing failed", 944 + request_id: request_id, 945 + reason: to_string(reason) 946 + ) 947 + 929 948 SowerWeb.Endpoint.broadcast( 930 949 "agent:#{agent.sid}", 931 950 "deployment:error", ··· 962 981 seeds = Enum.map(seed_deploys, & &1.seed) 963 982 964 983 if seeds == [] do 984 + Logger.warning( 985 + msg: "No matching seeds found for deployment request", 986 + request_id: request_id, 987 + subscription_count: length(subscriptions) 988 + ) 989 + 965 990 {:error, :seeds_not_found} 966 991 else 967 992 content_hash = compute_content_hash(seeds) 968 993 994 + Logger.debug( 995 + msg: "Processing deployment with matched seeds", 996 + request_id: request_id, 997 + seed_count: length(seeds), 998 + content_hash: content_hash 999 + ) 1000 + 969 1001 case find_duplicate_deployment(agent_id, content_hash, force?) do 970 1002 {:skip, existing} -> 971 1003 existing = Repo.preload(existing, [:seeds]) 1004 + 1005 + Logger.info( 1006 + msg: "Skipping deployment - duplicate found", 1007 + request_id: request_id, 1008 + deployment_sid: existing.sid, 1009 + content_hash: content_hash 1010 + ) 972 1011 973 1012 {:ok, 974 1013 %SowerClient.Orchestration.Deployment{ ··· 979 1018 }} 980 1019 981 1020 :proceed -> 1021 + Logger.debug( 1022 + msg: "Creating new deployment record", 1023 + request_id: request_id, 1024 + agent_id: agent_id 1025 + ) 1026 + 982 1027 case create_deployment(%{ 983 1028 agent_id: agent_id, 984 1029 content_hash: content_hash, ··· 986 1031 subscriptions: subscriptions 987 1032 }) do 988 1033 {:ok, deploy} -> 1034 + Logger.info( 1035 + msg: "Deployment record created successfully", 1036 + request_id: request_id, 1037 + deployment_sid: deploy.sid 1038 + ) 1039 + 989 1040 {:ok, 990 1041 %SowerClient.Orchestration.Deployment{ 991 1042 request_id: request_id, ··· 994 1045 skipped: false 995 1046 }} 996 1047 997 - other -> 998 - other 1048 + {:error, reason} -> 1049 + Logger.error( 1050 + msg: "Failed to create deployment record", 1051 + request_id: request_id, 1052 + reason: inspect(reason) 1053 + ) 1054 + 1055 + {:error, reason} 999 1056 end 1000 1057 end 1001 1058 end
+87
apps/sower/test/sower/orchestration_test.exs
··· 901 901 end 902 902 end 903 903 904 + describe "handle_deployment_request/2" do 905 + import Sower.OrchestrationFixtures 906 + 907 + test "returns immediate request_id for valid deployment request", %{organization: _org} do 908 + agent = agent_fixture() 909 + _seed = seed_fixture(%{name: "testhost", seed_type: "nixos"}) 910 + 911 + subscription = 912 + subscription_fixture(%{ 913 + agent_id: agent.id, 914 + seed_name: "testhost", 915 + seed_type: "nixos" 916 + }) 917 + 918 + # Payload with provided request_id (as the client would generate) 919 + payload = %{ 920 + "subscription_sids" => [subscription.sid], 921 + "request_id" => "req_test_#{System.unique_integer([:positive])}", 922 + "force" => false 923 + } 924 + 925 + assert {:ok, request_id} = Orchestration.handle_deployment_request(payload, agent) 926 + assert is_binary(request_id) 927 + end 928 + 929 + test "returns error for deployment request with unauthorized subscription", %{ 930 + organization: _org 931 + } do 932 + agent1 = agent_fixture() 933 + agent2 = agent_fixture() 934 + 935 + # Create subscription for agent1 936 + subscription = 937 + subscription_fixture(%{ 938 + agent_id: agent1.id, 939 + seed_name: "testhost", 940 + seed_type: "nixos" 941 + }) 942 + 943 + # Try to use agent2's subscription with agent1's context (should fail) 944 + payload = %{ 945 + "subscription_sids" => [subscription.sid], 946 + "force" => false 947 + } 948 + 949 + # This should be rejected because agent2 doesn't own the subscription 950 + result = Orchestration.handle_deployment_request(payload, agent2) 951 + assert result == {:error, :unauthorized} 952 + end 953 + 954 + test "process_deployment returns request_id and starts async task", %{organization: _org} do 955 + agent = agent_fixture() 956 + _seed = seed_fixture(%{name: "testhost", seed_type: "nixos"}) 957 + 958 + subscription = 959 + subscription_fixture(%{ 960 + agent_id: agent.id, 961 + seed_name: "testhost", 962 + seed_type: "nixos" 963 + }) 964 + 965 + request_id = "dr_test_#{System.unique_integer([:positive])}" 966 + 967 + # process_deployment should return immediately with the request_id 968 + assert {:ok, ^request_id} = 969 + Orchestration.process_deployment(request_id, [subscription], agent) 970 + end 971 + 972 + test "process_deployment handles error case with no matching seeds", %{organization: _org} do 973 + agent = agent_fixture() 974 + 975 + # Create subscription with no matching seed 976 + subscription = 977 + subscription_fixture(%{ 978 + agent_id: agent.id, 979 + seed_name: "nonexistent", 980 + seed_type: "nixos" 981 + }) 982 + 983 + request_id = "dr_test_error_#{System.unique_integer([:positive])}" 984 + 985 + # Should still return {:ok, request_id} since processing is async 986 + assert {:ok, ^request_id} = 987 + Orchestration.process_deployment(request_id, [subscription], agent) 988 + end 989 + end 990 + 904 991 defp unique_hash do 905 992 :crypto.strong_rand_bytes(16) |> Base.encode32(case: :lower) |> String.slice(0, 32) 906 993 end
+18 -8
apps/sower_agent/lib/sower_agent/client.ex
··· 191 191 {:ok, socket} 192 192 193 193 {:ok, deployment} -> 194 - Logger.debug( 195 - msg: "Received deployment", 196 - request_id: deployment.request_id, 197 - deployment_sid: deployment.sid 198 - ) 194 + if Map.has_key?(socket.active_deployments, deployment.sid) do 195 + Logger.debug( 196 + msg: "Ignoring duplicate deployment event", 197 + request_id: deployment.request_id, 198 + deployment_sid: deployment.sid 199 + ) 199 200 200 - socket = put_in(socket.active_deployments[deployment.sid], deployment) 201 - send(self(), {:run_deployment, deployment.sid}) 201 + {:ok, socket} 202 + else 203 + Logger.debug( 204 + msg: "Received deployment", 205 + request_id: deployment.request_id, 206 + deployment_sid: deployment.sid 207 + ) 202 208 203 - {:ok, socket} 209 + socket = put_in(socket.active_deployments[deployment.sid], deployment) 210 + send(self(), {:run_deployment, deployment.sid}) 211 + 212 + {:ok, socket} 213 + end 204 214 205 215 {:error, error} -> 206 216 Logger.error(msg: "Error casting deployment", error: error)
+233
apps/sower_agent/test/sower_agent/client_deployment_test.exs
··· 1 + defmodule SowerAgent.ClientDeploymentTest do 2 + @moduledoc """ 3 + Tests for deployment message handling and duplicate suppression. 4 + """ 5 + use ExUnit.Case, async: false 6 + 7 + alias SowerAgent.Client 8 + 9 + # Mock socket for testing handle_message callbacks 10 + defmodule MockSocket do 11 + defstruct [:assigns, :active_deployments] 12 + 13 + def new(agent_sid \\ "test_agent_123") do 14 + %__MODULE__{ 15 + assigns: %{agent_sid: agent_sid}, 16 + active_deployments: %{} 17 + } 18 + end 19 + 20 + def with_active_deployment(socket, deployment) do 21 + put_in(socket.active_deployments[deployment.sid], deployment) 22 + end 23 + end 24 + 25 + describe "handle_message for deployment events" do 26 + test "enqueues deployment for new deployment_sid" do 27 + socket = MockSocket.new() 28 + 29 + deployment = %SowerClient.Orchestration.Deployment{ 30 + sid: "deploy_123", 31 + request_id: "dr_456", 32 + seed_deployments: [], 33 + skipped: false 34 + } 35 + 36 + payload = Map.from_struct(deployment) 37 + 38 + {:ok, updated_socket} = 39 + Client.handle_message( 40 + "agent:test_agent_123", 41 + "deployment", 42 + payload, 43 + socket 44 + ) 45 + 46 + # Verify deployment was added to active_deployments 47 + assert Map.has_key?(updated_socket.active_deployments, "deploy_123") 48 + assert updated_socket.active_deployments["deploy_123"].sid == "deploy_123" 49 + end 50 + 51 + test "ignores duplicate deployment events for already active deployment" do 52 + deployment = %SowerClient.Orchestration.Deployment{ 53 + sid: "deploy_123", 54 + request_id: "dr_456", 55 + seed_deployments: [], 56 + skipped: false 57 + } 58 + 59 + socket = MockSocket.new() |> MockSocket.with_active_deployment(deployment) 60 + 61 + payload = Map.from_struct(deployment) 62 + 63 + {:ok, updated_socket} = 64 + Client.handle_message( 65 + "agent:test_agent_123", 66 + "deployment", 67 + payload, 68 + socket 69 + ) 70 + 71 + # Verify active_deployments hasn't changed (no duplicate added) 72 + assert map_size(updated_socket.active_deployments) == 1 73 + assert Map.has_key?(updated_socket.active_deployments, "deploy_123") 74 + # Verify the deployment wasn't replaced (same reference would mean same object) 75 + assert updated_socket.active_deployments["deploy_123"] == deployment 76 + end 77 + 78 + test "allows simultaneous deployments for different sids" do 79 + socket = MockSocket.new() 80 + 81 + deployment1 = %SowerClient.Orchestration.Deployment{ 82 + sid: "deploy_123", 83 + request_id: "dr_456", 84 + seed_deployments: [], 85 + skipped: false 86 + } 87 + 88 + deployment2 = %SowerClient.Orchestration.Deployment{ 89 + sid: "deploy_789", 90 + request_id: "dr_abc", 91 + seed_deployments: [], 92 + skipped: false 93 + } 94 + 95 + payload1 = Map.from_struct(deployment1) 96 + payload2 = Map.from_struct(deployment2) 97 + 98 + {:ok, socket} = 99 + Client.handle_message( 100 + "agent:test_agent_123", 101 + "deployment", 102 + payload1, 103 + socket 104 + ) 105 + 106 + {:ok, socket} = 107 + Client.handle_message( 108 + "agent:test_agent_123", 109 + "deployment", 110 + payload2, 111 + socket 112 + ) 113 + 114 + # Both deployments should be tracked 115 + assert map_size(socket.active_deployments) == 2 116 + assert Map.has_key?(socket.active_deployments, "deploy_123") 117 + assert Map.has_key?(socket.active_deployments, "deploy_789") 118 + end 119 + 120 + test "handles deployment:error event" do 121 + socket = MockSocket.new() 122 + 123 + payload = %{ 124 + "request_id" => "dr_error_123", 125 + "reason" => "seeds_not_found" 126 + } 127 + 128 + {:ok, updated_socket} = 129 + Client.handle_message( 130 + "agent:test_agent_123", 131 + "deployment:error", 132 + payload, 133 + socket 134 + ) 135 + 136 + # Socket should remain unchanged on error 137 + assert updated_socket.active_deployments == %{} 138 + end 139 + 140 + test "handles skipped deployment" do 141 + socket = MockSocket.new() 142 + 143 + deployment = %SowerClient.Orchestration.Deployment{ 144 + sid: "deploy_existing", 145 + request_id: "dr_skip_456", 146 + seed_deployments: [], 147 + skipped: true 148 + } 149 + 150 + payload = Map.from_struct(deployment) 151 + 152 + {:ok, updated_socket} = 153 + Client.handle_message( 154 + "agent:test_agent_123", 155 + "deployment", 156 + payload, 157 + socket 158 + ) 159 + 160 + # Skipped deployments should not be added to active_deployments 161 + assert updated_socket.active_deployments == %{} 162 + end 163 + 164 + test "handles invalid deployment payload gracefully" do 165 + socket = MockSocket.new() 166 + 167 + payload = %{ 168 + "invalid" => "data" 169 + } 170 + 171 + {:ok, _socket} = 172 + Client.handle_message( 173 + "agent:test_agent_123", 174 + "deployment", 175 + payload, 176 + socket 177 + ) 178 + 179 + # Should return {:ok, socket} even on error, not crash 180 + # The error is logged but not raised 181 + end 182 + 183 + test "duplicate suppression maintains separate state per deployment_sid" do 184 + socket = MockSocket.new() 185 + 186 + # First deployment - should be accepted 187 + deployment1 = %SowerClient.Orchestration.Deployment{ 188 + sid: "deploy_first", 189 + request_id: "dr_1", 190 + seed_deployments: [], 191 + skipped: false 192 + } 193 + 194 + {:ok, socket} = 195 + Client.handle_message( 196 + "agent:test_agent_123", 197 + "deployment", 198 + Map.from_struct(deployment1), 199 + socket 200 + ) 201 + 202 + # Try to add duplicate of first - should be ignored 203 + {:ok, socket} = 204 + Client.handle_message( 205 + "agent:test_agent_123", 206 + "deployment", 207 + Map.from_struct(deployment1), 208 + socket 209 + ) 210 + 211 + # Second deployment (different sid) - should be accepted 212 + deployment2 = %SowerClient.Orchestration.Deployment{ 213 + sid: "deploy_second", 214 + request_id: "dr_2", 215 + seed_deployments: [], 216 + skipped: false 217 + } 218 + 219 + {:ok, socket} = 220 + Client.handle_message( 221 + "agent:test_agent_123", 222 + "deployment", 223 + Map.from_struct(deployment2), 224 + socket 225 + ) 226 + 227 + # Should have both deployments, but not the duplicate 228 + assert map_size(socket.active_deployments) == 2 229 + assert Map.has_key?(socket.active_deployments, "deploy_first") 230 + assert Map.has_key?(socket.active_deployments, "deploy_second") 231 + end 232 + end 233 + end