Deployment and lifecycle management for Nix
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

agent: merge subscription back into client schema

+51 -144
+40 -10
apps/sower_agent/lib/sower_agent/client.ex
··· 20 20 def handle_cast(:register_subscriptions, socket) do 21 21 subscriptions = 22 22 SowerAgent.Config.get().subscriptions 23 - |> Enum.map(fn agent_sub -> 24 - # Convert to client schema before sending to server 25 - client_sub = SowerAgent.Subscription.to_client_schema(agent_sub) 26 - 27 - with {:ok, ref} <- push_message(socket, client_sub), 23 + |> Enum.map(fn sub -> 24 + with {:ok, ref} <- push_message(socket, sub), 28 25 {:ok, response} <- await_reply(ref), 29 26 {:ok, registered} <- 30 27 SowerClient.Schemas.Orchestration.Subscription.cast(response) do 31 28 Logger.debug(registered) 32 29 33 - # Merge server-assigned sid back into agent subscription 34 - %{agent_sub | sid: registered.sid} 30 + # Merge server-assigned sid back into subscription 31 + %{sub | sid: registered.sid} 35 32 else 36 33 {:error, error} -> 37 34 Logger.error( 38 35 msg: "Failed to register subscription", 39 36 error: error, 40 - subscription: agent_sub 37 + subscription: sub 41 38 ) 42 39 43 40 nil ··· 45 42 :error -> 46 43 Logger.error( 47 44 msg: "Failed to register subscription with unknown error", 48 - subscription: agent_sub 45 + subscription: sub 49 46 ) 50 47 51 48 nil ··· 53 50 end) 54 51 |> Enum.reject(&is_nil/1) 55 52 56 - :ok = Enum.each(subscriptions, &SowerAgent.Subscription.start_schedule/1) 53 + :ok = Enum.each(subscriptions, &start_schedule/1) 57 54 58 55 SowerAgent.Storage.put(:subscriptions, subscriptions) 59 56 ··· 205 202 def private_channel(_socket) do 206 203 "agent:#{Storage.read().agent_sid}" 207 204 end 205 + 206 + defp start_schedule(%SowerClient.Schemas.Orchestration.Subscription{ 207 + sid: sid, 208 + schedule: schedule 209 + }) 210 + when not is_nil(sid) and not is_nil(schedule) do 211 + case Crontab.CronExpression.Parser.parse(schedule) do 212 + {:ok, cron} -> 213 + SowerAgent.Scheduler.new_job() 214 + |> Quantum.Job.set_name(:"sub_#{sid}") 215 + |> Quantum.Job.set_schedule(cron) 216 + |> Quantum.Job.set_task(fn -> 217 + Logger.debug( 218 + msg: "Running subscription schedule", 219 + subscription_sid: sid, 220 + schedule: schedule 221 + ) 222 + end) 223 + |> SowerAgent.Scheduler.add_job() 224 + 225 + {:error, error} -> 226 + Logger.error( 227 + msg: "Failed to parse schedule", 228 + error: error, 229 + schedule: schedule, 230 + subscription_sid: sid 231 + ) 232 + 233 + nil 234 + end 235 + end 236 + 237 + defp start_schedule(_), do: nil 208 238 end
+1 -24
apps/sower_agent/lib/sower_agent/config.ex
··· 40 40 }, 41 41 subscriptions: %Schema{ 42 42 type: :array, 43 - items: SowerAgent.Subscription, 43 + items: SowerClient.Schemas.Orchestration.Subscription, 44 44 default: [] 45 45 } 46 46 }, ··· 119 119 120 120 def process_side_effects({:state_directory, dir}, acc) do 121 121 Keyword.put(acc, :state_directory, Path.expand(dir)) 122 - end 123 - 124 - def process_side_effects({:subscriptions, subscriptions}, acc) 125 - when is_list(subscriptions) do 126 - normalized_subscriptions = 127 - Enum.map(subscriptions, fn subscription -> 128 - case subscription do 129 - %{schedule: schedule} when is_binary(schedule) -> 130 - case Crontab.CronExpression.Parser.parse(schedule) do 131 - {:ok, cron} -> 132 - %{subscription | schedule: cron} 133 - 134 - {:error, error} -> 135 - Logger.error(msg: "Failed to parse schedule", error: error) 136 - subscription 137 - end 138 - 139 - subscription -> 140 - subscription 141 - end 142 - end) 143 - 144 - Keyword.put(acc, :subscriptions, normalized_subscriptions) 145 122 end 146 123 147 124 def process_side_effects({:__struct__, _}, acc), do: acc
-110
apps/sower_agent/lib/sower_agent/subscription.ex
··· 1 - defmodule SowerAgent.Subscription do 2 - @moduledoc """ 3 - Agent has its own idea of subscription to avoid sending 4 - everything to the server m 5 - """ 6 - alias OpenApiSpex.Schema 7 - require OpenApiSpex 8 - require Logger 9 - 10 - alias SowerClient.Schemas.Orchestration.Subscription.Rule 11 - 12 - OpenApiSpex.schema(%{ 13 - title: "AgentSubscription", 14 - type: :object, 15 - properties: %{ 16 - # Fields shared with server (mirrors SowerClient.Schemas.Orchestration.Subscription) 17 - sid: %Schema{ 18 - type: :string, 19 - description: "Subscription sid allocated by Sower", 20 - readOnly: true, 21 - nullable: true 22 - }, 23 - seed_name: %Schema{ 24 - type: :string, 25 - description: "Name of the seed", 26 - example: "myhost" 27 - }, 28 - seed_type: %Schema{ 29 - type: :string, 30 - description: "Type of the seed", 31 - enum: SowerClient.Schemas.Seed.seed_types(), 32 - example: "nixos" 33 - }, 34 - rules: %Schema{ 35 - type: :array, 36 - items: Rule, 37 - default: [], 38 - description: "Tag-based rules to filter seeds" 39 - }, 40 - 41 - # Agent-only fields 42 - schedule: %Schema{ 43 - type: :string, 44 - description: "Cron expression for polling schedule", 45 - example: "*/15 * * * *", 46 - nullable: true 47 - }, 48 - poll_on_connect: %Schema{ 49 - type: :boolean, 50 - description: "Whether to request deployment immediately on connect", 51 - default: false 52 - } 53 - }, 54 - required: [:seed_name, :seed_type] 55 - }) 56 - 57 - @doc """ 58 - Convert to the client schema for sending to the server. 59 - Strips agent-only fields. 60 - """ 61 - def to_client_schema(%__MODULE__{} = sub) do 62 - %SowerClient.Schemas.Orchestration.Subscription{ 63 - sid: sub.sid, 64 - seed_name: sub.seed_name, 65 - seed_type: sub.seed_type, 66 - rules: sub.rules 67 - } 68 - end 69 - 70 - @doc """ 71 - Cast a map to the AgentSubscription struct with validation. 72 - """ 73 - def cast(attrs) do 74 - spec = build_spec() 75 - resolved_schema = spec.components.schemas["AgentSubscription"] 76 - OpenApiSpex.cast_value(attrs, resolved_schema, spec) 77 - end 78 - 79 - def cast!(attrs) do 80 - {:ok, val} = cast(attrs) 81 - val 82 - end 83 - 84 - def start_schedule(%__MODULE__{sid: sid, schedule: schedule} = sub) 85 - when not is_nil(sid) and not is_nil(schedule) do 86 - SowerAgent.Scheduler.new_job() 87 - |> Quantum.Job.set_name(:"sub_#{sid}") 88 - |> Quantum.Job.set_schedule(sub.schedule) 89 - |> Quantum.Job.set_task(fn -> 90 - Logger.debug( 91 - msg: "Running subscription schedule", 92 - subscription_sid: sid, 93 - schedule: schedule 94 - ) 95 - end) 96 - |> SowerAgent.Scheduler.add_job() 97 - end 98 - 99 - def start_schedule(_), do: nil 100 - 101 - defp build_spec do 102 - %OpenApiSpex.OpenApi{ 103 - info: %OpenApiSpex.Info{title: "AgentSubscription", version: "1.0.0"}, 104 - paths: %{}, 105 - components: nil 106 - } 107 - |> OpenApiSpex.resolve_schema_modules() 108 - |> OpenApiSpex.add_schemas([__MODULE__]) 109 - end 110 - end
+10
apps/sower_client/lib/schemas/orchestration/subscription.ex
··· 28 28 items: __MODULE__.Rule, 29 29 default: [], 30 30 description: "Tag-based rules to filter seeds" 31 + }, 32 + schedule: %Schema{ 33 + type: :string, 34 + description: "Cron expression for polling schedule (agent-only)", 35 + nullable: true 36 + }, 37 + poll_on_connect: %Schema{ 38 + type: :boolean, 39 + description: "Whether to request deployment immediately on connect (agent-only)", 40 + default: false 31 41 } 32 42 }, 33 43 required: [],