Deployment and lifecycle management for Nix
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: catch up missed scheduled deployments on garden reconnect

When a garden is offline during a scheduled deployment window, the
deployment is silently missed because schedule evaluation happens
client-side via Quantum. This adds server-side catch-up: after
subscription sync on reconnect, the server checks each scheduled
subscription against its last successful deployment time and triggers
deployments for any that are overdue.

Changes:
- Add schedule and timezone columns to subscriptions table
- Persist schedule/timezone during subscription sync
- Add catch_up_overdue_schedules/2 with timezone-aware cron evaluation
- Trigger catch-up after subscriptions:sync in garden channel
- Garden stamps timezone on subscriptions before sync

SOW-77
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+336 -5
+7
apps/garden/lib/garden/socket.ex
··· 55 55 @impl Slipstream 56 56 def handle_cast(:sync_subscriptions, socket) do 57 57 config_subscriptions = Garden.Config.get().subscriptions 58 + tz = Scheduler.get_timezone() 59 + 60 + config_subscriptions = 61 + Enum.map(config_subscriptions, fn sub -> 62 + if sub.schedule, do: %{sub | timezone: tz}, else: sub 63 + end) 64 + 58 65 sync_payload = %{subscriptions: Enum.map(config_subscriptions, &Map.from_struct/1)} 59 66 60 67 topic = private_channel(socket)
+1
apps/sower/lib/sower/orchestration.ex
··· 38 38 defdelegate register_subscription(req, garden_id), to: Subscription 39 39 defdelegate sync_subscriptions(subscriptions, garden_id), to: Subscription 40 40 defdelegate update_subscription(subscription, attrs), to: Subscription 41 + defdelegate catch_up_overdue_schedules(garden, opts \\ []), to: Subscription 41 42 42 43 # Deployment delegates 43 44 defdelegate change_deployment(deployment, attrs \\ %{}), to: Deployment
+109 -4
apps/sower/lib/sower/orchestration/subscription.ex
··· 3 3 import Ecto.Changeset 4 4 import Ecto.Query, warn: false 5 5 6 + require Logger 7 + 6 8 alias Sower.Repo 7 9 alias Sower.Orchestration.{Garden, Deployment, SubscriptionDeployment} 8 10 ··· 19 21 20 22 field :seed_name, :string 21 23 field :seed_type, :string 24 + field :schedule, :string 25 + field :timezone, :string 22 26 embeds_many :rules, __MODULE__.Rule 23 27 24 28 timestamps(type: :utc_datetime) ··· 27 31 @doc false 28 32 def changeset(subscription, attrs) do 29 33 subscription 30 - |> cast(attrs, [:garden_id, :seed_name, :seed_type]) 34 + |> cast(attrs, [:garden_id, :seed_name, :seed_type, :schedule, :timezone]) 31 35 |> cast_embed(:rules, with: &__MODULE__.Rule.changeset/2) 32 36 |> unique_constraint([:garden_id, :org_id, :seed_name, :seed_type]) 33 37 end ··· 130 134 } 131 135 |> changeset(attrs) 132 136 |> Repo.insert( 133 - on_conflict: {:replace, [:updated_at, :rules]}, 137 + on_conflict: {:replace, [:updated_at, :rules, :schedule, :timezone]}, 134 138 conflict_target: [:garden_id, :org_id, :seed_name, :seed_type], 135 139 returning: true 136 140 ) do ··· 143 147 %SowerClient.Orchestration.Subscription{ 144 148 seed_name: seed_name, 145 149 seed_type: seed_type, 146 - rules: rules 150 + rules: rules, 151 + schedule: schedule, 152 + timezone: timezone 147 153 }, 148 154 garden_id 149 155 ) do ··· 151 157 garden_id: garden_id, 152 158 seed_name: seed_name, 153 159 seed_type: seed_type, 154 - rules: rules 160 + rules: rules, 161 + schedule: schedule, 162 + timezone: timezone 155 163 }) do 156 164 {:ok, subscription} -> 157 165 {:ok, SowerClient.Orchestration.Subscription.cast!(subscription)} ··· 197 205 subscription 198 206 |> Repo.preload(:garden) 199 207 |> changeset(attrs) 208 + end 209 + 210 + # Schedule catch-up 211 + 212 + def catch_up_overdue_schedules(%Garden{} = garden, opts \\ []) do 213 + now = Keyword.get(opts, :now, DateTime.utc_now()) 214 + 215 + scheduled_subscriptions = list_scheduled_subscriptions_for_garden(garden) 216 + 217 + overdue = 218 + Enum.filter(scheduled_subscriptions, fn sub -> 219 + schedule_is_overdue?(sub, now) 220 + end) 221 + 222 + if overdue != [] do 223 + Logger.info( 224 + msg: "Found overdue scheduled subscriptions", 225 + garden_sid: garden.sid, 226 + count: length(overdue), 227 + subscription_sids: Enum.map(overdue, & &1.sid) 228 + ) 229 + end 230 + 231 + overdue 232 + end 233 + 234 + defp list_scheduled_subscriptions_for_garden(%Garden{} = garden) do 235 + from(s in __MODULE__, 236 + where: s.garden_id == ^garden.id, 237 + where: not is_nil(s.schedule), 238 + preload: [:garden] 239 + ) 240 + |> Repo.all() 241 + end 242 + 243 + defp schedule_is_overdue?(%__MODULE__{schedule: schedule, timezone: timezone} = sub, now) do 244 + case Crontab.CronExpression.Parser.parse(schedule) do 245 + {:ok, %{reboot: true}} -> 246 + false 247 + 248 + {:ok, cron} -> 249 + naive_now = to_local_naive(now, timezone) 250 + 251 + previous_run = 252 + Crontab.Scheduler.get_previous_run_date!(cron, naive_now) 253 + |> from_local_naive(timezone) 254 + 255 + last_deployed = last_successful_deployment_time(sub.id) 256 + 257 + case last_deployed do 258 + nil -> true 259 + deployed_at -> DateTime.compare(previous_run, deployed_at) == :gt 260 + end 261 + 262 + {:error, error} -> 263 + Logger.warning( 264 + msg: "Failed to parse schedule for catch-up", 265 + subscription_sid: sub.sid, 266 + schedule: schedule, 267 + error: error 268 + ) 269 + 270 + false 271 + end 272 + end 273 + 274 + defp last_successful_deployment_time(subscription_id) do 275 + from(d in Deployment, 276 + join: sd in SubscriptionDeployment, 277 + on: sd.deployment_id == d.id, 278 + where: sd.subscription_id == ^subscription_id, 279 + where: d.state == :completed and d.result == :success, 280 + order_by: [desc: d.deployed_at], 281 + limit: 1, 282 + select: d.deployed_at 283 + ) 284 + |> Repo.one() 285 + end 286 + 287 + defp to_local_naive(datetime, nil), do: DateTime.to_naive(datetime) 288 + defp to_local_naive(datetime, "Etc/UTC"), do: DateTime.to_naive(datetime) 289 + 290 + defp to_local_naive(datetime, tz) do 291 + datetime 292 + |> DateTime.shift_zone!(tz) 293 + |> DateTime.to_naive() 294 + end 295 + 296 + defp from_local_naive(naive, nil), do: DateTime.from_naive!(naive, "Etc/UTC") 297 + defp from_local_naive(naive, "Etc/UTC"), do: DateTime.from_naive!(naive, "Etc/UTC") 298 + 299 + defp from_local_naive(naive, tz) do 300 + case DateTime.from_naive(naive, tz) do 301 + {:ok, dt} -> DateTime.shift_zone!(dt, "Etc/UTC") 302 + {:ambiguous, dt, _} -> DateTime.shift_zone!(dt, "Etc/UTC") 303 + {:gap, _, just_after} -> DateTime.shift_zone!(just_after, "Etc/UTC") 304 + end 200 305 end 201 306 202 307 defmodule Rule do
+15
apps/sower/lib/sower_web/garden_channel.ex
··· 133 133 handle_schema(SowerClient.Orchestration.SubscriptionSync, fn req, socket -> 134 134 case Sower.Orchestration.sync_subscriptions(req.subscriptions, socket.assigns.garden.id) do 135 135 {:ok, subscriptions} -> 136 + send(self(), :catch_up_overdue_schedules) 136 137 {:ok, %{subscriptions: subscriptions}} 137 138 138 139 {:error, error} -> ··· 221 222 deployment_count: length(deployments) 222 223 ) 223 224 end 225 + 226 + {:noreply, socket} 227 + end 228 + 229 + def handle_info( 230 + :catch_up_overdue_schedules, 231 + %Phoenix.Socket{assigns: %{garden: garden}} = socket 232 + ) do 233 + Sower.Repo.put_org_id(garden.org_id) 234 + overdue = Orchestration.catch_up_overdue_schedules(garden) 235 + 236 + Enum.each(overdue, fn sub -> 237 + Orchestration.Deployment.deploy_subscription(sub) 238 + end) 224 239 225 240 {:noreply, socket} 226 241 end
+10
apps/sower/priv/repo/migrations/20260323023552_add_schedule_to_subscriptions.exs
··· 1 + defmodule Sower.Repo.Migrations.AddScheduleToSubscriptions do 2 + use Ecto.Migration 3 + 4 + def change do 5 + alter table(:subscriptions) do 6 + add :schedule, :string 7 + add :timezone, :string 8 + end 9 + end 10 + end
+188
apps/sower/test/sower/orchestration/subscription_schedule_test.exs
··· 1 + defmodule Sower.Orchestration.SubscriptionScheduleTest do 2 + use Sower.DataCase, async: true 3 + 4 + alias Sower.Orchestration.Subscription 5 + 6 + import Sower.AccountsFixtures 7 + import Sower.OrchestrationFixtures 8 + import Sower.SeedFixtures 9 + 10 + setup do 11 + org = organization_fixture() 12 + Sower.Repo.put_org_id(org.org_id) 13 + garden = garden_fixture(%{org_id: org.org_id}) 14 + seed = seed_fixture(%{org_id: org.org_id, name: "test-seed", seed_type: "nixos"}) 15 + 16 + %{org: org, garden: garden, seed: seed} 17 + end 18 + 19 + describe "catch_up_overdue_schedules/2" do 20 + test "subscription with schedule and no prior deployments is overdue", 21 + %{garden: garden} do 22 + subscription_fixture(%{ 23 + garden_id: garden.id, 24 + seed_name: "test-seed", 25 + seed_type: "nixos", 26 + schedule: "0 3 * * *" 27 + }) 28 + 29 + overdue = Subscription.catch_up_overdue_schedules(garden) 30 + 31 + assert length(overdue) == 1 32 + end 33 + 34 + test "subscription with schedule and recent successful deployment is not overdue", 35 + %{garden: garden, seed: seed} do 36 + sub = 37 + subscription_fixture(%{ 38 + garden_id: garden.id, 39 + seed_name: "test-seed", 40 + seed_type: "nixos", 41 + schedule: "0 3 * * *" 42 + }) 43 + 44 + # Deploy 30 minutes ago — within the current schedule window 45 + deployed_at = DateTime.add(DateTime.utc_now(), -30 * 60, :second) 46 + 47 + deployment_fixture(%{ 48 + garden_id: garden.id, 49 + seeds: [seed], 50 + subscriptions: [sub], 51 + state: :completed, 52 + result: :success, 53 + deployed_at: DateTime.truncate(deployed_at, :second) 54 + }) 55 + 56 + # Set now to 30 minutes after the most recent 3am UTC 57 + {:ok, cron} = Crontab.CronExpression.Parser.parse("0 3 * * *") 58 + previous_run = Crontab.Scheduler.get_previous_run_date!(cron, NaiveDateTime.utc_now()) 59 + now = DateTime.from_naive!(previous_run, "Etc/UTC") |> DateTime.add(30 * 60, :second) 60 + 61 + overdue = Subscription.catch_up_overdue_schedules(garden, now: now) 62 + 63 + assert overdue == [] 64 + end 65 + 66 + test "subscription with deployment older than last cron run is overdue", 67 + %{garden: garden, seed: seed} do 68 + sub = 69 + subscription_fixture(%{ 70 + garden_id: garden.id, 71 + seed_name: "test-seed", 72 + seed_type: "nixos", 73 + schedule: "* * * * *" 74 + }) 75 + 76 + # Deploy 2 minutes ago — the every-minute schedule has fired since 77 + deployed_at = 78 + DateTime.utc_now() 79 + |> DateTime.add(-120, :second) 80 + |> DateTime.truncate(:second) 81 + 82 + deployment_fixture(%{ 83 + garden_id: garden.id, 84 + seeds: [seed], 85 + subscriptions: [sub], 86 + state: :completed, 87 + result: :success, 88 + deployed_at: deployed_at 89 + }) 90 + 91 + overdue = Subscription.catch_up_overdue_schedules(garden) 92 + 93 + assert length(overdue) == 1 94 + assert hd(overdue).sid == sub.sid 95 + end 96 + 97 + test "subscription without schedule is never overdue", 98 + %{garden: garden} do 99 + subscription_fixture(%{ 100 + garden_id: garden.id, 101 + seed_name: "test-seed", 102 + seed_type: "nixos", 103 + schedule: nil 104 + }) 105 + 106 + overdue = Subscription.catch_up_overdue_schedules(garden) 107 + 108 + assert overdue == [] 109 + end 110 + 111 + test "failed deployments don't count as successful", 112 + %{garden: garden, seed: seed} do 113 + sub = 114 + subscription_fixture(%{ 115 + garden_id: garden.id, 116 + seed_name: "test-seed", 117 + seed_type: "nixos", 118 + schedule: "* * * * *" 119 + }) 120 + 121 + deployment_fixture(%{ 122 + garden_id: garden.id, 123 + seeds: [seed], 124 + subscriptions: [sub], 125 + state: :completed, 126 + result: :failure, 127 + deployed_at: DateTime.truncate(DateTime.utc_now(), :second) 128 + }) 129 + 130 + overdue = Subscription.catch_up_overdue_schedules(garden) 131 + 132 + assert length(overdue) == 1 133 + end 134 + 135 + test "stale deployments don't count as successful", 136 + %{garden: garden, seed: seed} do 137 + sub = 138 + subscription_fixture(%{ 139 + garden_id: garden.id, 140 + seed_name: "test-seed", 141 + seed_type: "nixos", 142 + schedule: "* * * * *" 143 + }) 144 + 145 + deployment_fixture(%{ 146 + garden_id: garden.id, 147 + seeds: [seed], 148 + subscriptions: [sub], 149 + state: :stale, 150 + result: :failure, 151 + deployed_at: DateTime.truncate(DateTime.utc_now(), :second) 152 + }) 153 + 154 + overdue = Subscription.catch_up_overdue_schedules(garden) 155 + 156 + assert length(overdue) == 1 157 + end 158 + 159 + test "timezone-aware evaluation", 160 + %{garden: garden} do 161 + # Schedule at 3am in America/New_York 162 + subscription_fixture(%{ 163 + garden_id: garden.id, 164 + seed_name: "test-seed", 165 + seed_type: "nixos", 166 + schedule: "0 3 * * *", 167 + timezone: "America/New_York" 168 + }) 169 + 170 + # Set now to 4am ET (which is 8am or 9am UTC depending on DST) 171 + # 3am ET has passed, so the subscription should be overdue 172 + et_now = 173 + DateTime.utc_now() 174 + |> DateTime.shift_zone!("America/New_York") 175 + 176 + # Build a time that is 4am ET today 177 + four_am_et = 178 + et_now 179 + |> DateTime.to_date() 180 + |> then(&DateTime.new!(&1, ~T[04:00:00], "America/New_York")) 181 + |> DateTime.shift_zone!("Etc/UTC") 182 + 183 + overdue = Subscription.catch_up_overdue_schedules(garden, now: four_am_et) 184 + 185 + assert length(overdue) == 1 186 + end 187 + end 188 + end
+6 -1
apps/sower_client/lib/sower_client/orchestration/subscription.ex
··· 38 38 }, 39 39 schedule: %Schema{ 40 40 type: :string, 41 - description: "Cron expression for polling schedule (garden-only)", 41 + description: "Cron expression for polling schedule", 42 + nullable: true 43 + }, 44 + timezone: %Schema{ 45 + type: :string, 46 + description: "IANA timezone for schedule evaluation", 42 47 nullable: true 43 48 }, 44 49 poll_on_connect: %Schema{