Deployment and lifecycle management for Nix
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: trigger realtime deploys on seed creation

Add Durable workflow engine and a RealtimeDeploy workflow that fires
after Seed.create/2. It finds matching subscriptions with
allow_realtime enabled, checks window constraints, and triggers
deploy_subscription for each.

- Add durable dependency with migration and supervision tree setup
- Add within_window? helper to Subscription for time window evaluation
- Add find_realtime_subscriptions to filter by allow_realtime flag
- Skip org_id check in Repo for Durable's internal schema prefix
- Update nix deps lock

SOW-130

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+122 -3
+1
apps/sower/lib/sower/application.ex
··· 16 16 {Finch, name: Sower.Finch}, 17 17 {Phoenix.PubSub, name: Sower.PubSub}, 18 18 SowerWeb.Presence, 19 + {Durable, repo: Sower.Repo, queues: %{default: [concurrency: 10]}}, 19 20 {Task.Supervisor, name: Sower.TaskSupervisor}, 20 21 Sower.Orchestration.StaleDeploymentFinalizer, 21 22 SowerWeb.Endpoint,
+11 -2
apps/sower/lib/sower/orchestration/seed.ex
··· 76 76 end) 77 77 |> Repo.transact() 78 78 |> case do 79 - {:ok, %{seed: seed}} -> {:ok, Repo.preload(seed, [:tags])} 80 - {:error, _} = error -> error 79 + {:ok, %{seed: seed}} -> 80 + seed = Repo.preload(seed, [:tags]) 81 + trigger_realtime_deploys(seed) 82 + {:ok, seed} 83 + 84 + {:error, _} = error -> 85 + error 81 86 end 87 + end 88 + 89 + defp trigger_realtime_deploys(%Seed{} = seed) do 90 + Durable.start(Sower.Orchestration.Workflows.RealtimeDeploy, %{"seed_id" => seed.id}) 82 91 end 83 92 84 93 def create!(attrs, opts \\ []) do
+26
apps/sower/lib/sower/orchestration/subscription.ex
··· 143 143 |> Repo.all() 144 144 end 145 145 146 + def find_realtime_subscriptions(%Sower.Orchestration.Seed{} = seed) do 147 + seed 148 + |> find_subscription() 149 + |> Enum.filter(fn sub -> sub.allow_realtime end) 150 + end 151 + 152 + def within_window?(%__MODULE__{window: nil}, _now), do: true 153 + 154 + def within_window?(%__MODULE__{window: window}, now) do 155 + local = DateTime.shift_zone!(now, window.tz) 156 + day = local |> DateTime.to_date() |> Date.day_of_week() |> day_name() 157 + time = DateTime.to_time(local) 158 + 159 + day in window.days and 160 + Time.compare(time, Time.from_iso8601!("#{window.time_start}:00")) != :lt and 161 + Time.compare(time, Time.from_iso8601!("#{window.time_end}:00")) != :gt 162 + end 163 + 164 + defp day_name(1), do: "mon" 165 + defp day_name(2), do: "tue" 166 + defp day_name(3), do: "wed" 167 + defp day_name(4), do: "thu" 168 + defp day_name(5), do: "fri" 169 + defp day_name(6), do: "sat" 170 + defp day_name(7), do: "sun" 171 + 146 172 def create_subscription(attrs \\ %{}) do 147 173 attrs = Map.put_new_lazy(attrs, :name, fn -> attrs[:seed_name] end) 148 174
+48
apps/sower/lib/sower/orchestration/workflows/realtime_deploy.ex
··· 1 + defmodule Sower.Orchestration.Workflows.RealtimeDeploy do 2 + use Durable 3 + 4 + alias Sower.Orchestration.{Deployment, Subscription} 5 + 6 + require Logger 7 + 8 + workflow "realtime_deploy" do 9 + step(:find_subscriptions, fn input -> 10 + seed = Sower.Orchestration.Seed.get_by_id!(input["seed_id"]) 11 + subscriptions = Subscription.find_realtime_subscriptions(seed) 12 + {:ok, %{subscription_sids: Enum.map(subscriptions, & &1.sid)}} 13 + end) 14 + 15 + step(:deploy, [retry: [max_attempts: 3, backoff: :exponential]], fn data -> 16 + now = DateTime.utc_now() 17 + 18 + results = 19 + data.subscription_sids 20 + |> Enum.map(&Subscription.get_subscription_sid/1) 21 + |> Enum.reject(&is_nil/1) 22 + |> Enum.filter(&Subscription.within_window?(&1, now)) 23 + |> Enum.map(fn sub -> 24 + case Deployment.deploy_subscription(sub) do 25 + {:ok, request_id} -> 26 + Logger.info( 27 + msg: "Realtime deploy triggered", 28 + subscription_sid: sub.sid, 29 + request_id: request_id 30 + ) 31 + 32 + {:ok, sub.sid} 33 + 34 + {:error, reason} -> 35 + Logger.warning( 36 + msg: "Realtime deploy failed", 37 + subscription_sid: sub.sid, 38 + error: inspect(reason) 39 + ) 40 + 41 + {:error, sub.sid, reason} 42 + end 43 + end) 44 + 45 + {:ok, Map.put(data, :results, results)} 46 + end) 47 + end 48 + end
+1 -1
apps/sower/lib/sower/repo.ex
··· 29 29 def prepare_query(_operation, query, opts) do 30 30 cond do 31 31 opts[:skip_org_id] || opts[:ecto_query] in [:schema_migration, :preload] || 32 - opts[:schema_migration] -> 32 + opts[:schema_migration] || get_in(query.from, [Access.key(:prefix)]) == "durable" -> 33 33 {query, opts} 34 34 35 35 org_id = opts[:org_id] ->
+1
apps/sower/mix.exs
··· 31 31 {:bandit, "~> 1.0"}, 32 32 {:cloak_ecto, "~> 1.3.0"}, 33 33 {:cuid2_ex, "~> 0.2.0"}, 34 + {:durable, github: "wavezync/durable"}, 34 35 {:ecto_sql, "~> 3.11"}, 35 36 {:esbuild, "~> 0.8", runtime: Mix.env() == :dev}, 36 37 {:open_api_spex, "~> 3.22"},
+6
apps/sower/priv/repo/migrations/20260331150000_add_durable.exs
··· 1 + defmodule Sower.Repo.Migrations.AddDurable do 2 + use Ecto.Migration 3 + 4 + def up, do: Durable.Migration.up() 5 + def down, do: Durable.Migration.down() 6 + end
+1
mix.lock
··· 11 11 "decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"}, 12 12 "deps_nix": {:hex, :deps_nix, "2.6.2", "153a486fbfa76de007441060fdcdb38d533cc96b172c07ae5baff52dd2f21e82", [:mix], [{:ex_nar, "~> 0.3.0", [hex: :ex_nar, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: false]}], "hexpm", "9be50588be3769e68e5311c3fd1afe1e3c58883264198ef55121370f2da2604b"}, 13 13 "dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"}, 14 + "durable": {:git, "https://github.com/wavezync/durable.git", "739bec3196142fde4abc02a00ffddfa2b271dd80", []}, 14 15 "ecto": {:hex, :ecto, "3.13.5", "9d4a69700183f33bf97208294768e561f5c7f1ecf417e0fa1006e4a91713a834", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "df9efebf70cf94142739ba357499661ef5dbb559ef902b68ea1f3c1fabce36de"}, 15 16 "ecto_sql": {:hex, :ecto_sql, "3.13.5", "2f8282b2ad97bf0f0d3217ea0a6fff320ead9e2f8770f810141189d182dc304e", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.13.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.19 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "aa36751f4e6a2b56ae79efb0e088042e010ff4935fc8684e74c23b1f49e25fdc"}, 16 17 "elixir_make": {:hex, :elixir_make, "0.9.0", "6484b3cd8c0cee58f09f05ecaf1a140a8c97670671a6a0e7ab4dc326c3109726", [:mix], [], "hexpm", "db23d4fd8b757462ad02f8aa73431a426fe6671c80b200d9710caf3d1dd0ffdb"},
+27
nix/packages/deps.nix
··· 363 363 in 364 364 drv; 365 365 366 + durable = 367 + let 368 + version = "0.0.0-alpha"; 369 + drv = buildMix { 370 + inherit version; 371 + name = "durable"; 372 + appConfigPath = ../../config; 373 + 374 + src = fetchFromGitHub { 375 + owner = "wavezync"; 376 + repo = "durable"; 377 + rev = "739bec3196142fde4abc02a00ffddfa2b271dd80"; 378 + hash = "sha256-EcxpBWyq0VUg5iSEwGYo4ZUYbAASSwwTYtuIzYl95Xc="; 379 + }; 380 + 381 + beamDeps = [ 382 + ecto_sql 383 + postgrex 384 + jason 385 + telemetry 386 + nimble_options 387 + crontab 388 + ]; 389 + }; 390 + in 391 + drv; 392 + 366 393 ecto = 367 394 let 368 395 version = "3.13.5";