Deployment and lifecycle management for Nix
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: replace durable dep with oban for realtime deploys

Replace the unmaintained Durable workflow library (GitHub-only dep) with
Oban for the RealtimeDeploy workflow. Improves the design by fanning out
to per-subscription deploy jobs instead of retrying the entire batch.

sow-143

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+303 -83
+7 -1
apps/sower/lib/sower/application.ex
··· 16 16 {Finch, name: Sower.Finch}, 17 17 {Phoenix.PubSub, name: Sower.PubSub}, 18 18 SowerWeb.Presence, 19 - {Durable, repo: Sower.Repo, queues: %{default: [concurrency: 10]}}, 19 + {Oban, oban_config()}, 20 20 {Task.Supervisor, name: Sower.TaskSupervisor}, 21 21 Sower.Orchestration.StaleDeploymentFinalizer, 22 22 SowerWeb.Endpoint, ··· 27 27 # for other strategies and supported options 28 28 opts = [strategy: :one_for_one, name: Sower.Supervisor] 29 29 Supervisor.start_link(children, opts) 30 + end 31 + 32 + @oban_testing if Mix.env() == :test, do: :manual, else: :disabled 33 + 34 + defp oban_config do 35 + [repo: Sower.Repo, queues: [default: 10], testing: @oban_testing] 30 36 end 31 37 32 38 # Tell Phoenix to update the endpoint configuration
+3 -1
apps/sower/lib/sower/orchestration/seed.ex
··· 112 112 end 113 113 114 114 defp trigger_realtime_deploys(%Seed{} = seed) do 115 - Durable.start(Sower.Orchestration.Workflows.RealtimeDeploy, %{"seed_id" => seed.id}) 115 + %{"seed_id" => seed.id, "org_id" => seed.org_id} 116 + |> Sower.Workers.RealtimeDeploy.new() 117 + |> Oban.insert() 116 118 end 117 119 118 120 def create!(attrs, opts \\ []) do
-48
apps/sower/lib/sower/orchestration/workflows/realtime_deploy.ex
··· 1 - defmodule Sower.Orchestration.Workflows.RealtimeDeploy do 2 - use Durable 3 - 4 - alias Sower.Orchestration.{Deployment, Subscription} 5 - 6 - require Logger 7 - 8 - workflow "realtime_deploy" do 9 - step(:find_subscriptions, fn input -> 10 - seed = Sower.Orchestration.Seed.get_by_id!(input["seed_id"]) 11 - subscriptions = Subscription.find_realtime_subscriptions(seed) 12 - {:ok, %{subscription_sids: Enum.map(subscriptions, & &1.sid)}} 13 - end) 14 - 15 - step(:deploy, [retry: [max_attempts: 3, backoff: :exponential]], fn data -> 16 - now = DateTime.utc_now() 17 - 18 - results = 19 - data.subscription_sids 20 - |> Enum.map(&Subscription.get_subscription_sid/1) 21 - |> Enum.reject(&is_nil/1) 22 - |> Enum.filter(&Subscription.within_window?(&1, now)) 23 - |> Enum.map(fn sub -> 24 - case Deployment.deploy_subscription(sub) do 25 - {:ok, request_id, _pid} -> 26 - Logger.info( 27 - msg: "Realtime deploy triggered", 28 - subscription_sid: sub.sid, 29 - request_id: request_id 30 - ) 31 - 32 - {:ok, sub.sid} 33 - 34 - {:error, reason} -> 35 - Logger.warning( 36 - msg: "Realtime deploy failed", 37 - subscription_sid: sub.sid, 38 - error: inspect(reason) 39 - ) 40 - 41 - {:error, sub.sid, reason} 42 - end 43 - end) 44 - 45 - {:ok, Map.put(data, :results, results)} 46 - end) 47 - end 48 - end
+6 -2
apps/sower/lib/sower/repo.ex
··· 31 31 def prepare_query(_operation, query, opts) do 32 32 cond do 33 33 opts[:skip_org_id] || opts[:ecto_query] in [:schema_migration, :preload] || 34 - opts[:schema_migration] || get_in(query.from, [Access.key(:prefix)]) == "durable" || 35 - boruta_table?(query) -> 34 + opts[:schema_migration] || oban_table?(query) || boruta_table?(query) -> 36 35 {query, opts} 37 36 38 37 org_id = opts[:org_id] -> ··· 42 41 raise "expected org_id or skip_org_id to be set" 43 42 end 44 43 end 44 + 45 + defp oban_table?(%{from: %{source: {table, _}}}) when is_binary(table), 46 + do: String.starts_with?(table, "oban_") 47 + 48 + defp oban_table?(_), do: false 45 49 46 50 defp boruta_table?(%{from: %{source: {table, _}}}) when table in @boruta_tables, do: true 47 51 defp boruta_table?(_), do: false
+52
apps/sower/lib/sower/workers/deploy_subscription.ex
··· 1 + defmodule Sower.Workers.DeploySubscription do 2 + use Oban.Worker, queue: :default, max_attempts: 3 3 + 4 + alias Sower.Orchestration.{Deployment, Subscription} 5 + 6 + require Logger 7 + 8 + @impl Oban.Worker 9 + def perform(%Oban.Job{args: %{"subscription_sid" => sid, "org_id" => org_id}}) do 10 + Sower.Repo.put_org_id(org_id) 11 + run(sid) 12 + end 13 + 14 + def run(sid, deploy_fun \\ &Deployment.deploy_subscription/1) do 15 + now = DateTime.utc_now() 16 + 17 + case Subscription.get_subscription_sid(sid) do 18 + nil -> 19 + Logger.warning(msg: "Subscription not found for deploy", subscription_sid: sid) 20 + :ok 21 + 22 + sub -> 23 + if Subscription.within_window?(sub, now) do 24 + deploy(sub, deploy_fun) 25 + else 26 + :ok 27 + end 28 + end 29 + end 30 + 31 + defp deploy(%Subscription{} = sub, deploy_fun) do 32 + case deploy_fun.(sub) do 33 + {:ok, request_id, _pid} -> 34 + Logger.info( 35 + msg: "Realtime deploy triggered", 36 + subscription_sid: sub.sid, 37 + request_id: request_id 38 + ) 39 + 40 + :ok 41 + 42 + {:error, reason} -> 43 + Logger.warning( 44 + msg: "Realtime deploy failed", 45 + subscription_sid: sub.sid, 46 + error: inspect(reason) 47 + ) 48 + 49 + {:error, reason} 50 + end 51 + end 52 + end
+21
apps/sower/lib/sower/workers/realtime_deploy.ex
··· 1 + defmodule Sower.Workers.RealtimeDeploy do 2 + use Oban.Worker, queue: :default, max_attempts: 3 3 + 4 + alias Sower.Orchestration.{Seed, Subscription} 5 + 6 + @impl Oban.Worker 7 + def perform(%Oban.Job{args: %{"seed_id" => seed_id, "org_id" => org_id}}) do 8 + Sower.Repo.put_org_id(org_id) 9 + 10 + seed = seed_id |> Seed.get_by_id!() |> Sower.Repo.preload([:tags]) 11 + subscriptions = Subscription.find_realtime_subscriptions(seed) 12 + 13 + subscriptions 14 + |> Enum.map(fn sub -> 15 + Sower.Workers.DeploySubscription.new(%{"subscription_sid" => sub.sid, "org_id" => org_id}) 16 + end) 17 + |> Oban.insert_all() 18 + 19 + :ok 20 + end 21 + end
+1 -1
apps/sower/mix.exs
··· 32 32 {:boruta, "~> 2.3"}, 33 33 {:cloak_ecto, "~> 1.3.0"}, 34 34 {:cuid2_ex, "~> 0.2.0"}, 35 - {:durable, github: "wavezync/durable"}, 35 + {:oban, "~> 2.21"}, 36 36 {:ecto_sql, "~> 3.11"}, 37 37 {:esbuild, "~> 0.8", runtime: Mix.env() == :dev}, 38 38 {:open_api_spex, "~> 3.22"},
+4 -2
apps/sower/priv/repo/migrations/20260331150000_add_durable.exs
··· 1 1 defmodule Sower.Repo.Migrations.AddDurable do 2 2 use Ecto.Migration 3 3 4 - def up, do: Durable.Migration.up() 5 - def down, do: Durable.Migration.down() 4 + # Durable was replaced by Oban in migration 20260409035412. 5 + # This migration is kept as a no-op since it already ran in existing databases. 6 + def up, do: :ok 7 + def down, do: :ok 6 8 end
+13
apps/sower/priv/repo/migrations/20260409035412_replace_durable_with_oban.exs
··· 1 + defmodule Sower.Repo.Migrations.ReplaceDurableWithOban do 2 + use Ecto.Migration 3 + 4 + def up do 5 + Oban.Migration.up(version: 12) 6 + execute("DROP SCHEMA IF EXISTS durable CASCADE") 7 + end 8 + 9 + def down do 10 + execute("DROP SCHEMA IF EXISTS public_oban CASCADE") 11 + Oban.Migration.down(version: 1) 12 + end 13 + end
+85
apps/sower/test/sower/workers/deploy_subscription_test.exs
··· 1 + defmodule Sower.Workers.DeploySubscriptionTest do 2 + use Sower.DataCase 3 + 4 + import Sower.AccountsFixtures 5 + import Sower.OrchestrationFixtures 6 + import Sower.SeedFixtures 7 + 8 + alias Sower.Workers.DeploySubscription 9 + 10 + setup do 11 + org = organization_fixture() 12 + Sower.Repo.put_org_id(org.org_id) 13 + %{org: org} 14 + end 15 + 16 + describe "run/2" do 17 + @tag :capture_log 18 + test "returns :ok when subscription not found" do 19 + assert :ok = DeploySubscription.run("sub_nonexistent") 20 + end 21 + 22 + test "returns :ok when subscription is outside window" do 23 + garden = garden_fixture() 24 + 25 + sub = 26 + subscription_fixture(%{ 27 + garden_id: garden.id, 28 + seed_name: "myhost", 29 + seed_type: "nixos", 30 + allow_realtime: true, 31 + window: %{ 32 + days: ["mon"], 33 + time_start: "00:00", 34 + time_end: "00:01", 35 + tz: "Pacific/Kiritimati" 36 + } 37 + }) 38 + 39 + assert :ok = DeploySubscription.run(sub.sid) 40 + end 41 + 42 + @tag :capture_log 43 + test "calls deploy function for subscription within window" do 44 + garden = garden_fixture() 45 + seed_fixture(%{name: "myhost", seed_type: "nixos"}) 46 + 47 + sub = 48 + subscription_fixture(%{ 49 + garden_id: garden.id, 50 + seed_name: "myhost", 51 + seed_type: "nixos", 52 + allow_realtime: true 53 + }) 54 + 55 + test_pid = self() 56 + 57 + deploy_fun = fn subscription -> 58 + send(test_pid, {:deployed, subscription.sid}) 59 + {:ok, "req_test123", self()} 60 + end 61 + 62 + assert :ok = DeploySubscription.run(sub.sid, deploy_fun) 63 + assert_received {:deployed, sid} 64 + assert sid == sub.sid 65 + end 66 + 67 + @tag :capture_log 68 + test "returns error when deploy fails" do 69 + garden = garden_fixture() 70 + seed_fixture(%{name: "myhost", seed_type: "nixos"}) 71 + 72 + sub = 73 + subscription_fixture(%{ 74 + garden_id: garden.id, 75 + seed_name: "myhost", 76 + seed_type: "nixos", 77 + allow_realtime: true 78 + }) 79 + 80 + deploy_fun = fn _sub -> {:error, :connection_refused} end 81 + 82 + assert {:error, :connection_refused} = DeploySubscription.run(sub.sid, deploy_fun) 83 + end 84 + end 85 + end
+85
apps/sower/test/sower/workers/realtime_deploy_test.exs
··· 1 + defmodule Sower.Workers.RealtimeDeployTest do 2 + use Sower.DataCase 3 + 4 + use Oban.Testing, repo: Sower.Repo 5 + 6 + import Sower.AccountsFixtures 7 + import Sower.OrchestrationFixtures 8 + import Sower.SeedFixtures 9 + 10 + alias Sower.Workers.{DeploySubscription, RealtimeDeploy} 11 + 12 + setup do 13 + org = organization_fixture() 14 + Sower.Repo.put_org_id(org.org_id) 15 + %{org: org} 16 + end 17 + 18 + describe "perform/1" do 19 + test "enqueues deploy jobs for realtime subscriptions", %{org: org} do 20 + garden = garden_fixture() 21 + 22 + seed = 23 + seed_fixture(%{ 24 + name: "myhost", 25 + seed_type: "nixos" 26 + }) 27 + 28 + subscription_fixture(%{ 29 + garden_id: garden.id, 30 + seed_name: "myhost", 31 + seed_type: "nixos", 32 + allow_realtime: true 33 + }) 34 + 35 + assert :ok = 36 + perform_job(RealtimeDeploy, %{ 37 + seed_id: seed.id, 38 + org_id: org.org_id 39 + }) 40 + 41 + assert_enqueued(worker: DeploySubscription) 42 + end 43 + 44 + test "does not enqueue jobs when no realtime subscriptions exist", %{org: org} do 45 + seed = 46 + seed_fixture(%{ 47 + name: "myhost", 48 + seed_type: "nixos" 49 + }) 50 + 51 + assert :ok = 52 + perform_job(RealtimeDeploy, %{ 53 + seed_id: seed.id, 54 + org_id: org.org_id 55 + }) 56 + 57 + refute_enqueued(worker: DeploySubscription) 58 + end 59 + 60 + test "skips subscriptions with allow_realtime false", %{org: org} do 61 + garden = garden_fixture() 62 + 63 + seed = 64 + seed_fixture(%{ 65 + name: "myhost", 66 + seed_type: "nixos" 67 + }) 68 + 69 + subscription_fixture(%{ 70 + garden_id: garden.id, 71 + seed_name: "myhost", 72 + seed_type: "nixos", 73 + allow_realtime: false 74 + }) 75 + 76 + assert :ok = 77 + perform_job(RealtimeDeploy, %{ 78 + seed_id: seed.id, 79 + org_id: org.org_id 80 + }) 81 + 82 + refute_enqueued(worker: DeploySubscription) 83 + end 84 + end 85 + end
+1
mix.lock
··· 53 53 "nebulex": {:hex, :nebulex, "2.6.6", "677e27fcfa89eaa085d9509d5e066f305f98c1b2264ce6676eaca6fb08d4939e", [:mix], [{:decorator, "~> 1.4", [hex: :decorator, repo: "hexpm", optional: true]}, {:shards, "~> 1.1", [hex: :shards, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "8cbf531af6fe407383b6ba410a43a19319af47804929d8a8d1975a780b9952df"}, 54 54 "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, 55 55 "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, 56 + "oban": {:hex, :oban, "2.21.1", "4b6af7b901ef9baca09e239b5a991ef2fa429cf5a13799bc429a131d610ff692", [:mix], [{:ecto_sql, "~> 3.10", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:ecto_sqlite3, "~> 0.9", [hex: :ecto_sqlite3, repo: "hexpm", optional: true]}, {:igniter, "~> 0.5", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.20", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 1.3", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "8162a160924cf4a25905fed2a9242e7787d88e320e3b5b0dcf324eb17c51c4e6"}, 56 57 "oidcc": {:hex, :oidcc, "3.7.0", "0aad613c4c44d1e807c3bc0a6e01c353b9627303447f86af429aeac2d08c0a9e", [:mix, :rebar3], [{:igniter, "~> 0.6.3", [hex: :igniter, repo: "hexpm", optional: true]}, {:jose, "~> 1.11", [hex: :jose, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.2", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_registry, "~> 0.3.1", [hex: :telemetry_registry, repo: "hexpm", optional: false]}], "hexpm", "a7414b744dcd64d238dc48e93a9f298b7d18e06ae5e699a66c54dd43bd594fae"}, 57 58 "open_api_spex": {:hex, :open_api_spex, "3.22.2", "0b3c4f572ee69cb6c936abf426b9d84d8eebd34960871fd77aead746f0d69cb0", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:poison, "~> 3.0 or ~> 4.0 or ~> 5.0 or ~> 6.0", [hex: :poison, repo: "hexpm", optional: true]}, {:ymlr, "~> 2.0 or ~> 3.0 or ~> 4.0 or ~> 5.0", [hex: :ymlr, repo: "hexpm", optional: true]}], "hexpm", "0a4fc08472d75e9cfe96e0748c6b1565b3b4398f97bf43fcce41b41b6fd3fb33"}, 58 59 "optimus": {:hex, :optimus, "0.6.1", "b493bdc7ee71035fdf8cbfbf158b1863e2d622b65c3906153e4042815c901fca", [:mix], [], "hexpm", "c0db4107a51f5af94de8b05e4208333ebb8016a3bfdbcd74df6e5c99829db17f"},
+25 -28
nix/packages/deps.nix
··· 421 421 in 422 422 drv; 423 423 424 - durable = 425 - let 426 - version = "0.0.0-alpha"; 427 - drv = buildMix { 428 - inherit version; 429 - name = "durable"; 430 - appConfigPath = ../../config; 431 - 432 - src = fetchFromGitHub { 433 - owner = "wavezync"; 434 - repo = "durable"; 435 - rev = "739bec3196142fde4abc02a00ffddfa2b271dd80"; 436 - hash = "sha256-EcxpBWyq0VUg5iSEwGYo4ZUYbAASSwwTYtuIzYl95Xc="; 437 - }; 438 - 439 - beamDeps = [ 440 - ecto_sql 441 - postgrex 442 - jason 443 - telemetry 444 - nimble_options 445 - crontab 446 - igniter 447 - ]; 448 - }; 449 - in 450 - drv; 451 - 452 424 ecto = 453 425 let 454 426 version = "3.13.5"; ··· 1154 1126 pkg = "nimble_pool"; 1155 1127 sha256 = "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"; 1156 1128 }; 1129 + }; 1130 + in 1131 + drv; 1132 + 1133 + oban = 1134 + let 1135 + version = "2.21.1"; 1136 + drv = buildMix { 1137 + inherit version; 1138 + name = "oban"; 1139 + appConfigPath = ../../config; 1140 + 1141 + src = fetchHex { 1142 + inherit version; 1143 + pkg = "oban"; 1144 + sha256 = "8162a160924cf4a25905fed2a9242e7787d88e320e3b5b0dcf324eb17c51c4e6"; 1145 + }; 1146 + 1147 + beamDeps = [ 1148 + ecto_sql 1149 + igniter 1150 + jason 1151 + postgrex 1152 + telemetry 1153 + ]; 1157 1154 }; 1158 1155 in 1159 1156 drv;