Deployment and lifecycle management for Nix
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

cli: get realtime updates from jobs runners

+108 -11
+10 -2
apps/nix/lib/nix/build/jobs.ex
··· 15 15 typedstruct do 16 16 field :evals, list(Nix.Eval.t()) 17 17 field :max_workers, integer(), default: @default_max_workers 18 + field :notify_pid, pid() 18 19 field :supervisor, pid() 19 20 field :from, {pid(), term()} 20 21 end ··· 56 57 state = %__MODULE__{ 57 58 evals: evals, 58 59 max_workers: Keyword.get(opts, :max_workers, @default_max_workers), 60 + notify_pid: Keyword.get(opts, :notify_pid), 59 61 supervisor: supervisor 60 62 } 61 63 ··· 76 78 Task.Supervisor.async_stream_nolink( 77 79 state.supervisor, 78 80 state.evals, 79 - fn eval -> run_build(eval) end, 81 + fn eval -> run_build(eval, state.notify_pid) end, 80 82 max_concurrency: state.max_workers, 81 83 ordered: false, 82 84 timeout: :infinity ··· 128 130 :ok 129 131 end 130 132 131 - defp run_build(eval) do 133 + defp run_build(eval, notify_pid) do 134 + drv_path = eval.output["drvPath"] 135 + notify(notify_pid, {:build_started, drv_path}) 132 136 {_status, build} = Build.run(eval) 137 + notify(notify_pid, {:build_completed, drv_path, build.status}) 133 138 build 134 139 end 135 140 ··· 152 157 153 158 defp format_crash_reason(:normal), do: "Task exited normally without result" 154 159 defp format_crash_reason(reason), do: Exception.format_exit(reason) 160 + 161 + defp notify(nil, _event), do: :ok 162 + defp notify(pid, event), do: send(pid, event) 155 163 end
+13 -2
apps/nix/lib/nix/eval/jobs.ex
··· 15 15 field :max_workers, integer() 16 16 field :memory_limit_kb, integer() 17 17 field :use_eval_cache, boolean(), default: false 18 + field :notify_pid, pid() 18 19 field :from, {pid(), term()} 19 20 field :supervisor, pid() 20 21 field :start_time, DateTime.t() ··· 54 55 max_workers: Keyword.get(opts, :workers, 8), 55 56 memory_limit_kb: Keyword.get(opts, :memory_limit_kb, 4_000_000), 56 57 use_eval_cache: Keyword.get(opts, :use_eval_cache, false), 58 + notify_pid: Keyword.get(opts, :notify_pid), 57 59 from: nil, 58 60 request: request, 59 61 supervisor: supervisor, ··· 104 106 # Spawn workers for dequeued requests 105 107 running = 106 108 Enum.reduce(requests, state.running, fn request, acc_running -> 109 + notify(state, {:eval_started, request.attr}) 110 + 107 111 task = 108 112 Task.Supervisor.async(state.supervisor, fn -> 109 113 evaluate_request(request, state) ··· 222 226 {request, running} -> 223 227 state = 224 228 case result do 225 - {:leaf, result} -> 226 - %{state | results: [result | state.results]} 229 + {:leaf, %Nix.Eval{status: status} = eval_result} -> 230 + notify(state, {:eval_completed, request.attr, status}) 231 + %{state | results: [eval_result | state.results]} 227 232 228 233 {:branch, new_targets} -> 229 234 # Bulk enqueue - now fast with ETS! 235 + notify(state, {:eval_completed, request.attr, :branch}) 230 236 Nix.Eval.Queue.enqueue(state.queue, state.queue_counter, new_targets) 231 237 state 232 238 ··· 236 242 result: unexpected, 237 243 request: request 238 244 ) 245 + 246 + notify(state, {:eval_completed, request.attr, :error}) 239 247 240 248 error_eval = %Nix.Eval{ 241 249 request: request, ··· 328 336 329 337 defp format_crash_reason(:normal), do: "Task exited normally without result" 330 338 defp format_crash_reason(reason), do: Exception.format_exit(reason) 339 + 340 + defp notify(%__MODULE__{notify_pid: nil}, _event), do: :ok 341 + defp notify(%__MODULE__{notify_pid: pid}, event), do: send(pid, event) 331 342 end
+64 -7
apps/sower_cli/lib/sower_cli/build.ex
··· 91 91 workers: state.options.eval_jobs, 92 92 type: state.options.eval_type, 93 93 use_eval_cache: state.flags.use_eval_cache, 94 - memory_limit_kb: state.options.memory_limit * 1_000 94 + memory_limit_kb: state.options.memory_limit * 1_000, 95 + notify_pid: self() 95 96 ] 96 97 97 - case Nix.Eval.Jobs.run(state.flake, opts) do 98 + task = Task.async(fn -> Nix.Eval.Jobs.run(state.flake, opts) end) 99 + 100 + result = 101 + receive_progress(task, fn 102 + {:eval_started, attr} -> 103 + Output.item_start("Evaluating", attr || "(root)") 104 + 105 + {:eval_completed, attr, :ok} -> 106 + Output.item_done("Evaluated", attr || "(root)") 107 + 108 + {:eval_completed, attr, :branch} -> 109 + Output.item_done("Discovered", attr || "(root)") 110 + 111 + {:eval_completed, attr, _error} -> 112 + Output.item_error("Eval failed", attr || "(root)") 113 + end) 114 + 115 + case result do 98 116 {:ok, %{results: results}} -> 99 117 Output.eval_summary(results) 100 118 run_steps(rest, %{state | evals: results}) ··· 120 138 defp run_steps([:build | rest], %__MODULE__{} = state) do 121 139 Output.step("Building #{length(state.evals)} derivation(s)") 122 140 123 - case Nix.Build.Jobs.run(state.evals, max_workers: state.options.build_jobs) do 124 - {:ok, result} -> 125 - builds = Output.build_summary(result) 141 + opts = [ 142 + max_workers: state.options.build_jobs, 143 + notify_pid: self() 144 + ] 145 + 146 + task = Task.async(fn -> Nix.Build.Jobs.run(state.evals, opts) end) 147 + 148 + result = 149 + receive_progress(task, fn 150 + {:build_started, drv_path} -> 151 + Output.item_start("Building", drv_path || "(unknown)") 152 + 153 + {:build_completed, drv_path, :ok} -> 154 + Output.item_done("Built", drv_path || "(unknown)") 155 + 156 + {:build_completed, drv_path, _error} -> 157 + Output.item_error("Build failed", drv_path || "(unknown)") 158 + end) 159 + 160 + case result do 161 + {:ok, job_result} -> 162 + builds = Output.build_summary(job_result) 126 163 run_steps(rest, %{state | builds: builds}) 127 164 128 - {:error, result} -> 129 - builds = Output.build_summary(result) 165 + {:error, job_result} -> 166 + builds = Output.build_summary(job_result) 130 167 Output.build_errors(builds) 131 168 132 169 if state.flags.fail_fast do ··· 204 241 } 205 242 } 206 243 } = build -> 244 + seed_name = Map.get(seed_meta, "name", build.store_path) 245 + Output.item_start("Registering", seed_name) 207 246 tags = load_tags(state) ++ Map.get(seed_meta, "tags", []) ++ SowerCli.Repo.get_tags() 208 247 209 248 case seed_meta ··· 213 252 {:ok, seed} -> 214 253 case SowerClient.Seed.create(client, seed) do 215 254 {:ok, _} = result -> 255 + Output.item_done("Registered", seed_name) 216 256 result 217 257 218 258 {:error, reason} = error -> 259 + Output.item_error("Failed", seed_name) 219 260 Output.error("Failed to register seed: #{inspect(reason)}") 220 261 error 221 262 end 222 263 223 264 {:error, error} -> 265 + Output.item_error("Failed", seed_name) 224 266 Output.error("Failed to cast seed: #{inspect(error)}") 225 267 {:error, {:cast_failed, error}} 226 268 end ··· 251 293 defp load_tags(%__MODULE__{} = state) do 252 294 state.options.tag 253 295 |> Enum.map(&SowerClient.SeedTag.from_string/1) 296 + end 297 + 298 + defp receive_progress(task, handler) do 299 + receive do 300 + {ref, result} when ref == task.ref -> 301 + Process.demonitor(ref, [:flush]) 302 + result 303 + 304 + {:DOWN, ref, :process, _pid, reason} when ref == task.ref -> 305 + {:error, {:task_crashed, reason}} 306 + 307 + msg -> 308 + handler.(msg) 309 + receive_progress(task, handler) 310 + end 254 311 end 255 312 end
+21
apps/sower_cli/lib/sower_cli/output.ex
··· 32 32 end 33 33 34 34 @doc """ 35 + Print an item starting message. 36 + """ 37 + def item_start(action, name) do 38 + IO.puts(" #{IO.ANSI.yellow()}⋯#{IO.ANSI.reset()} #{action} #{name}") 39 + end 40 + 41 + @doc """ 42 + Print an item completed message. 43 + """ 44 + def item_done(action, name) do 45 + IO.puts(" #{IO.ANSI.green()}✓#{IO.ANSI.reset()} #{action} #{name}") 46 + end 47 + 48 + @doc """ 49 + Print an item error message. 50 + """ 51 + def item_error(action, name) do 52 + IO.puts(" #{IO.ANSI.red()}✗#{IO.ANSI.reset()} #{action} #{name}") 53 + end 54 + 55 + @doc """ 35 56 Print eval results summary. 36 57 """ 37 58 def eval_summary(results) do