Deployment and lifecycle management for Nix
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: unify deployment reconciliation on garden reconnect

Replace separate replay and catch-up flows with a single
reconcile_deployments_on_connect that gathers overdue schedules and
unresolved deployments, cancels unresolved deployments superseded by
overdue schedules, replays valid unresolved deployments, and deploys
fresh for all overdue subscriptions. Remove now-dead
replay_unresolved_deployments function.

SOW-77
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+90 -63
+1
CLAUDE.md
··· 25 25 - Prefer red/green TDD. If unsure what style of testing, stop and ask. 26 26 - Always read code for project elixir dependencies from `deps`. Never query hexdocs or hex. 27 27 - SowerClient schemas must *always* be added to `sower_client.ex` 28 + - Remove dead code 28 29 29 30 ## Testing 30 31
-1
apps/sower/lib/sower/orchestration.ex
··· 57 57 defdelegate match_seed(subscription), to: Deployment 58 58 defdelegate process_deployment(request_id, subscriptions, garden, opts \\ []), to: Deployment 59 59 defdelegate record_deployment(result), to: Deployment 60 - defdelegate replay_unresolved_deployments(garden, opts \\ []), to: Deployment 61 60 defdelegate request_deployment(request), to: Deployment 62 61 defdelegate retry_deployment(deployment, user_id), to: Deployment 63 62 defdelegate update_deployment(deployment, attrs), to: Deployment
+35 -18
apps/sower/lib/sower/orchestration/deployment.ex
··· 253 253 end) 254 254 end 255 255 256 - # Replay 256 + # Reconcile 257 + 258 + def reconcile_deployments_on_connect(%Garden{} = garden, opts \\ []) do 259 + now = Keyword.get(opts, :now, DateTime.utc_now()) 257 260 258 - def replay_unresolved_deployments(%Garden{} = garden, opts \\ []) do 259 - broadcast_fun = Keyword.get(opts, :broadcast_fun, &SowerWeb.Endpoint.broadcast/3) 261 + # 1. Gather what needs attention 262 + overdue = Subscription.catch_up_overdue_schedules(garden, now: now) 263 + overdue_sub_ids = MapSet.new(overdue, & &1.id) 264 + unresolved = list_unresolved_deployments_for_garden(garden) 265 + 266 + # 2. Partition unresolved: replay those not superseded, cancel those that are 267 + {to_cancel, to_replay} = 268 + Enum.split_with(unresolved, fn deployment -> 269 + deployment.subscriptions 270 + |> Enum.any?(fn sub -> MapSet.member?(overdue_sub_ids, sub.id) end) 271 + end) 260 272 261 - request_id_fun = 262 - Keyword.get(opts, :request_id_fun, fn -> SowerClient.Sid.generate("request") end) 273 + Enum.each(to_cancel, fn deployment -> 274 + update_deployment(deployment, %{state: :stale, result: :failure}) 275 + end) 263 276 264 - now = Keyword.get(opts, :now, DateTime.utc_now()) 277 + # 3. Replay valid unresolved deployments 278 + mark_deployments_dispatched(to_replay, now) 265 279 266 - deployments = list_unresolved_deployments_for_garden(garden) 267 - mark_deployments_dispatched(deployments, now) 280 + Enum.each(to_replay, fn deployment -> 281 + request_id = SowerClient.Sid.generate("request") 282 + payload = deployment_event_payload(deployment, request_id) 283 + SowerWeb.Endpoint.broadcast("garden:#{garden.sid}", "deployment", payload) 284 + SowerWeb.Endpoint.broadcast("agent:#{garden.sid}", "deployment", payload) 285 + end) 268 286 269 - Enum.each(deployments, fn deployment -> 270 - payload = deployment_event_payload(deployment, request_id_fun.()) 271 - broadcast_fun.("garden:#{garden.sid}", "deployment", payload) 272 - # Backward compatibility: 0.7.0 gardens join "agent:*" 273 - broadcast_fun.("agent:#{garden.sid}", "deployment", payload) 287 + # 4. Deploy fresh for all overdue subscriptions 288 + Enum.each(overdue, fn sub -> 289 + deploy_subscription(sub) 274 290 end) 275 291 276 - if deployments != [] do 292 + if to_replay != [] or overdue != [] or to_cancel != [] do 277 293 Logger.info( 278 - msg: "Replayed unresolved deployments", 294 + msg: "Reconciled deployments on connect", 279 295 garden_sid: garden.sid, 280 - deployment_count: length(deployments), 281 - deployment_sids: Enum.map(deployments, & &1.sid) 296 + replayed_count: length(to_replay), 297 + cancelled_count: length(to_cancel), 298 + overdue_count: length(overdue) 282 299 ) 283 300 end 284 301 285 - {:ok, deployments} 302 + {:ok, %{replayed: to_replay, cancelled: to_cancel, overdue: overdue}} 286 303 end 287 304 288 305 # Seed matching
+3 -27
apps/sower/lib/sower_web/garden_channel.ex
··· 78 78 79 79 garden when garden.local_sid == local_sid -> 80 80 send(self(), :track_presence) 81 - send(self(), :replay_unresolved_deployments) 81 + send(self(), :reconcile_deployments) 82 82 {:ok, %{conn_sid: conn_sid}, assign(socket, :garden, garden)} 83 83 84 84 _ -> ··· 133 133 handle_schema(SowerClient.Orchestration.SubscriptionSync, fn req, socket -> 134 134 case Sower.Orchestration.sync_subscriptions(req.subscriptions, socket.assigns.garden.id) do 135 135 {:ok, subscriptions} -> 136 - send(self(), :catch_up_overdue_schedules) 137 136 {:ok, %{subscriptions: subscriptions}} 138 137 139 138 {:error, error} -> ··· 210 209 end 211 210 212 211 def handle_info( 213 - :replay_unresolved_deployments, 214 - %Phoenix.Socket{assigns: %{garden: garden}} = socket 215 - ) do 216 - {:ok, deployments} = Orchestration.replay_unresolved_deployments(garden) 217 - 218 - if deployments != [] do 219 - Logger.debug( 220 - msg: "Replayed unresolved deployments after garden join", 221 - garden_sid: garden.sid, 222 - deployment_count: length(deployments) 223 - ) 224 - end 225 - 226 - {:noreply, socket} 227 - end 228 - 229 - def handle_info( 230 - :catch_up_overdue_schedules, 212 + :reconcile_deployments, 231 213 %Phoenix.Socket{assigns: %{garden: garden}} = socket 232 214 ) do 233 - Sower.Repo.put_org_id(garden.org_id) 234 - overdue = Orchestration.catch_up_overdue_schedules(garden) 235 - 236 - Enum.each(overdue, fn sub -> 237 - Orchestration.Deployment.deploy_subscription(sub) 238 - end) 239 - 215 + Orchestration.Deployment.reconcile_deployments_on_connect(garden) 240 216 {:noreply, socket} 241 217 end 242 218
+11 -8
apps/sower/test/sower/orchestration/subscription_schedule_test.exs
··· 41 41 schedule: "0 3 * * *" 42 42 }) 43 43 44 - # Deploy 30 minutes ago — within the current schedule window 45 - deployed_at = DateTime.add(DateTime.utc_now(), -30 * 60, :second) 44 + # Compute a stable now: 30 minutes after the most recent 3am UTC 45 + {:ok, cron} = Crontab.CronExpression.Parser.parse("0 3 * * *") 46 + previous_run = Crontab.Scheduler.get_previous_run_date!(cron, NaiveDateTime.utc_now()) 47 + now = DateTime.from_naive!(previous_run, "Etc/UTC") |> DateTime.add(30 * 60, :second) 48 + 49 + # Deploy 15 minutes after the cron run — within the current window 50 + deployed_at = 51 + DateTime.from_naive!(previous_run, "Etc/UTC") 52 + |> DateTime.add(15 * 60, :second) 53 + |> DateTime.truncate(:second) 46 54 47 55 deployment_fixture(%{ 48 56 garden_id: garden.id, ··· 50 58 subscriptions: [sub], 51 59 state: :completed, 52 60 result: :success, 53 - deployed_at: DateTime.truncate(deployed_at, :second) 61 + deployed_at: deployed_at 54 62 }) 55 - 56 - # Set now to 30 minutes after the most recent 3am UTC 57 - {:ok, cron} = Crontab.CronExpression.Parser.parse("0 3 * * *") 58 - previous_run = Crontab.Scheduler.get_previous_run_date!(cron, NaiveDateTime.utc_now()) 59 - now = DateTime.from_naive!(previous_run, "Etc/UTC") |> DateTime.add(30 * 60, :second) 60 63 61 64 overdue = Subscription.catch_up_overdue_schedules(garden, now: now) 62 65
+39 -8
apps/sower/test/sower/orchestration_test.exs
··· 989 989 end 990 990 end 991 991 992 - describe "replay_unresolved_deployments/2" do 992 + describe "reconcile_deployments_on_connect/2" do 993 993 import Sower.OrchestrationFixtures 994 994 995 995 test "replays unresolved deployments and updates dispatch timestamp", %{organization: _org} do ··· 1025 1025 replayed_at = DateTime.utc_now() |> DateTime.truncate(:second) 1026 1026 Phoenix.PubSub.subscribe(Sower.PubSub, "agent:#{garden.sid}") 1027 1027 1028 - assert {:ok, deployments} = 1029 - Orchestration.replay_unresolved_deployments(garden, 1030 - now: replayed_at, 1031 - request_id_fun: fn -> "request_replay_1" end 1032 - ) 1028 + assert {:ok, %{replayed: replayed, cancelled: [], overdue: []}} = 1029 + Orchestration.Deployment.reconcile_deployments_on_connect(garden, now: replayed_at) 1033 1030 1034 - assert Enum.map(deployments, & &1.sid) == [unresolved.sid] 1031 + assert Enum.map(replayed, & &1.sid) == [unresolved.sid] 1035 1032 1036 1033 assert_receive %Phoenix.Socket.Broadcast{ 1037 1034 topic: topic, ··· 1042 1039 assert topic == "agent:#{garden.sid}" 1043 1040 assert payload.sid == unresolved.sid 1044 1041 assert payload.skipped == false 1045 - assert payload.request_id == "request_replay_1" 1042 + assert is_binary(payload.request_id) 1046 1043 1047 1044 refreshed = Orchestration.get_deployment_sid!(unresolved.sid) 1048 1045 1049 1046 assert DateTime.truncate(refreshed.last_dispatched_at, :second) == 1050 1047 DateTime.truncate(replayed_at, :second) 1048 + end 1049 + 1050 + test "cancels unresolved deployments superseded by overdue schedules", %{organization: _org} do 1051 + garden = garden_fixture() 1052 + seed = seed_fixture(%{name: "cancel-host", seed_type: "nixos"}) 1053 + 1054 + subscription = 1055 + subscription_fixture(%{ 1056 + garden_id: garden.id, 1057 + seed_name: seed.name, 1058 + seed_type: seed.seed_type, 1059 + schedule: "* * * * *" 1060 + }) 1061 + 1062 + unresolved = 1063 + deployment_fixture(%{ 1064 + garden_id: garden.id, 1065 + seeds: [seed], 1066 + subscriptions: [subscription], 1067 + result: nil, 1068 + deployed_at: nil 1069 + }) 1070 + 1071 + assert {:ok, %{replayed: [], cancelled: cancelled, overdue: overdue}} = 1072 + Orchestration.Deployment.reconcile_deployments_on_connect(garden) 1073 + 1074 + assert length(cancelled) == 1 1075 + assert hd(cancelled).sid == unresolved.sid 1076 + assert length(overdue) == 1 1077 + assert hd(overdue).sid == subscription.sid 1078 + 1079 + refreshed = Orchestration.get_deployment_sid!(unresolved.sid) 1080 + assert refreshed.state == :stale 1081 + assert refreshed.result == :failure 1051 1082 end 1052 1083 end 1053 1084
+1 -1
apps/sower/test/sower_web/channels/garden_channel_test.exs
··· 98 98 end 99 99 end 100 100 101 - describe "replay_unresolved_deployments on join" do 101 + describe "reconcile_deployments on join" do 102 102 test "replays unresolved deployments and skips terminal ones" do 103 103 user = user_fixture() 104 104 Sower.Repo.put_org_id(user.org_id)