Deployment and lifecycle management for Nix
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

chore: split orchestration out of context into relevant schema modules

+1265 -1638
+2 -2
apps/sower/lib/sower/authorization/permissions.ex
··· 34 34 org_id 35 35 ) do 36 36 permit 37 - |> read(Sower.Seed, org_id: org_id) 37 + |> read(Sower.Orchestration.Seed, org_id: org_id) 38 38 |> read(Sower.Nix.Cache, org_id: org_id) 39 39 end 40 40 ··· 44 44 org_id 45 45 ) do 46 46 permit 47 - |> all(Sower.Seed, org_id: org_id) 47 + |> all(Sower.Orchestration.Seed, org_id: org_id) 48 48 |> read(Sower.Nix.Cache, org_id: org_id) 49 49 end 50 50
+58 -1596
apps/sower/lib/sower/orchestration.ex
··· 3 3 The Orchestration context. 4 4 """ 5 5 6 - alias Sower.Repo 7 - alias Sower.Accounts.User 8 6 alias Sower.Orchestration.Agent 9 7 alias Sower.Orchestration.Deployment 10 - alias Sower.Orchestration.DeploymentPubSub 11 - 12 - import Ecto.Query, warn: false 13 - import Sower.Authorization 14 - 15 - require Logger 16 - 17 - @default_stale_after_seconds 2 * 60 * 60 18 - @default_stale_batch_size 100 19 - 20 - @doc """ 21 - Returns the list of agents. 22 - 23 - ## Examples 24 - 25 - iex> list_agents() 26 - [%Agent{}, ...] 27 - 28 - """ 29 - def list_agents do 30 - Repo.all(Agent) 31 - end 32 - 33 - @doc """ 34 - Returns the list of agents with their latest deployment preloaded. 35 - 36 - ## Examples 37 - 38 - iex> list_agents_with_latest_deployment() 39 - [%Agent{latest_deployment: %Deployment{} | nil}, ...] 40 - 41 - """ 42 - def list_agents_with_latest_deployment do 43 - latest_deployment_query = 44 - from(d in Deployment, 45 - where: d.agent_id == parent_as(:agent).id, 46 - order_by: [desc: d.inserted_at], 47 - limit: 1 48 - ) 49 - 50 - from(a in Agent, 51 - as: :agent, 52 - left_lateral_join: d in subquery(latest_deployment_query), 53 - on: true, 54 - select: %{a | latest_deployment: d} 55 - ) 56 - |> Repo.all() 57 - end 58 - 59 - def get_agent( 60 - %SowerClient.AgentHello{agent_sid: nil, name: name, local_sid: local_sid}, 61 - socket 62 - ) do 63 - case get_agent_local_sid(local_sid) do 64 - nil -> 65 - Logger.debug( 66 - msg: "Registering new agent", 67 - name: name, 68 - local_sid: local_sid 69 - ) 70 - 71 - if socket.assigns.access_token |> can() |> create?(Agent) do 72 - create_agent(%{name: name, local_sid: local_sid}) 73 - else 74 - {:error, :unauthorized} 75 - end 76 - 77 - %Agent{} = agent -> 78 - Logger.error( 79 - msg: "Local agent attempted to re-register existing agent", 80 - name: agent.name, 81 - local_sid: local_sid, 82 - existing_agent_sid: agent.sid 83 - ) 84 - 85 - {:error, :unauthorized_agent_hello} 86 - end 87 - end 88 - 89 - def get_agent( 90 - %SowerClient.AgentHello{agent_sid: agent_sid, name: name, local_sid: local_sid}, 91 - socket 92 - ) do 93 - case get_agent_sid(agent_sid) do 94 - nil -> 95 - Logger.debug( 96 - msg: "Local agent requested a missing agent", 97 - name: name, 98 - local_sid: local_sid, 99 - requested_agent_sid: agent_sid 100 - ) 101 - 102 - if socket.assigns.access_token |> can() |> create?(Agent) do 103 - create_agent(%{name: name, local_sid: local_sid}) 104 - else 105 - {:error, :unauthorized} 106 - end 107 - 108 - %Agent{local_sid: nil} = agent when agent.name == name -> 109 - Logger.debug( 110 - msg: "Registering local sid to existing agent", 111 - name: agent.name, 112 - local_sid: local_sid, 113 - agent_sid: agent.sid 114 - ) 115 - 116 - if socket.assigns.access_token |> can() |> create?(Agent) do 117 - agent = update_agent(agent, %{local_sid: local_sid}) 118 - 119 - {:ok, agent} 120 - else 121 - {:error, :unauthorized_agent_hello} 122 - end 123 - 124 - %Agent{} = agent 125 - when agent.sid == agent_sid and 126 - agent.name == name and 127 - agent.local_sid == local_sid -> 128 - Logger.debug( 129 - msg: "Found matching agent", 130 - name: agent.name, 131 - local_sid: local_sid, 132 - agent_sid: agent.sid 133 - ) 134 - 135 - {:ok, agent} 136 - 137 - %Agent{} = agent 138 - when agent.sid == agent_sid and 139 - agent.name != name and 140 - agent.local_sid == local_sid -> 141 - Logger.info( 142 - msg: "Found matching agent with different name, renaming", 143 - name: name, 144 - previous_name: agent.name, 145 - local_sid: local_sid, 146 - agent_sid: agent.sid 147 - ) 148 - 149 - {:ok, agent} = update_agent(agent, %{name: name}) 150 - 151 - {:ok, agent} 152 - 153 - %Agent{} = agent -> 154 - Logger.error( 155 - msg: "Invalid agent request", 156 - local_sid: local_sid, 157 - agent_sid: agent.sid 158 - ) 159 - 160 - {:error, :unauthorized_agent_hello} 161 - end 162 - end 163 - 164 - @doc """ 165 - Gets a single agent. 166 - 167 - Raises `Ecto.NoResultsError` if the Agent does not exist. 168 - 169 - ## Examples 170 - 171 - iex> get_agent!(123) 172 - %Agent{} 173 - 174 - iex> get_agent!(456) 175 - ** (Ecto.NoResultsError) 176 - 177 - """ 178 - def get_agent!(id), do: Repo.get!(Agent, id) 179 - 180 - @doc """ 181 - Gets a single agent by sid. 182 - 183 - Raises `Ecto.NoResultsError` if the Agent does not exist. 184 - 185 - ## Examples 186 - 187 - iex> get_agent_sid!("123") 188 - %Agent{} 189 - 190 - iex> get_agent_sid!("456") 191 - ** (Ecto.NoResultsError) 192 - 193 - """ 194 - def get_agent_sid!(sid), do: Repo.get_by!(Agent, sid: sid) 195 - 196 - @doc """ 197 - Gets a single agent by sid. 198 - 199 - ## Examples 200 - 201 - iex> get_agent_sid!("123") 202 - %Agent{} 203 - 204 - iex> get_agent_sid!("456") 205 - nil 206 - 207 - """ 208 - def get_agent_sid(sid), do: Repo.get_by(Agent, sid: sid) 209 - 210 - @doc """ 211 - Gets a single agent by local_sid. 212 - 213 - Raises `Ecto.NoResultsError` if the Agent does not exist. 214 - 215 - ## Examples 216 - 217 - iex> get_agent_local_sid!("123") 218 - %Agent{} 219 - 220 - iex> get_agent_local_sid!("456") 221 - nil 222 - 223 - """ 224 - def get_agent_local_sid(local_sid), do: Repo.get_by(Agent, local_sid: local_sid) 225 - 226 - @doc """ 227 - Gets a single agent by local_sid. 228 - 229 - Raises `Ecto.NoResultsError` if the Agent does not exist. 230 - 231 - ## Examples 232 - 233 - iex> get_agent_local_sid!("123") 234 - %Agent{} 235 - 236 - iex> get_agent_local_sid!("456") 237 - ** (Ecto.NoResultsError) 238 - 239 - """ 240 - def get_agent_local_sid!(local_sid), do: Repo.get_by!(Agent, local_sid: local_sid) 241 - 242 - @doc """ 243 - Creates a agent. 244 - 245 - ## Examples 246 - 247 - iex> create_agent(%{field: value}) 248 - {:ok, %Agent{}} 249 - 250 - iex> create_agent(%{field: bad_value}) 251 - {:error, %Ecto.Changeset{}} 252 - 253 - """ 254 - def create_agent(attrs \\ %{}) do 255 - %Agent{ 256 - org_id: Sower.Repo.get_org_id(), 257 - sid: SowerClient.Sid.generate("agent") 258 - } 259 - |> Agent.changeset(attrs) 260 - |> Repo.insert() 261 - end 262 - 263 - @doc """ 264 - Updates a agent. 265 - 266 - ## Examples 267 - 268 - iex> update_agent(agent, %{field: new_value}) 269 - {:ok, %Agent{}} 270 - 271 - iex> update_agent(agent, %{field: bad_value}) 272 - {:error, %Ecto.Changeset{}} 273 - 274 - """ 275 - def update_agent(%Agent{} = agent, attrs) do 276 - agent 277 - |> Agent.changeset(attrs) 278 - |> Repo.update() 279 - end 280 - 281 - @doc """ 282 - Deletes a agent. 283 - 284 - ## Examples 285 - 286 - iex> delete_agent(agent) 287 - {:ok, %Agent{}} 288 - 289 - iex> delete_agent(agent) 290 - {:error, %Ecto.Changeset{}} 291 - 292 - """ 293 - def delete_agent(%Agent{} = agent) do 294 - Repo.delete(agent) 295 - end 296 - 297 - @doc """ 298 - Returns an `%Ecto.Changeset{}` for tracking agent changes. 299 - 300 - ## Examples 301 - 302 - iex> change_agent(agent) 303 - %Ecto.Changeset{data: %Agent{}} 304 - 305 - """ 306 - def change_agent(%Agent{} = agent, attrs \\ %{}) do 307 - Agent.changeset(agent, attrs) 308 - end 309 - 310 8 alias Sower.Orchestration.Subscription 311 - 312 - @doc """ 313 - List deployments for a specific agent, ordered by most recent first. 314 - 315 - ## Options 316 - * `:limit` - Maximum number of deployments to return (default: 10) 317 - 318 - ## Examples 319 - 320 - iex> list_deployments(agent, limit: 10) 321 - [%Deployment{}, ...] 322 - """ 323 - def list_deployments(%Agent{} = agent, opts \\ []) do 324 - limit = Keyword.get(opts, :limit, 10) 325 - 326 - from(d in Deployment, 327 - where: d.agent_id == ^agent.id, 328 - order_by: [desc: d.inserted_at], 329 - limit: ^limit 330 - ) 331 - |> Repo.all() 332 - end 333 - 334 - @doc """ 335 - List unresolved deployments for a specific agent, oldest dispatch first. 336 - """ 337 - def list_unresolved_deployments_for_agent(%Agent{} = agent, opts \\ []) do 338 - limit = Keyword.get(opts, :limit) 339 - 340 - query = 341 - from(d in Deployment, 342 - where: d.agent_id == ^agent.id and is_nil(d.result), 343 - order_by: [ 344 - asc: fragment("COALESCE(?, ?)", d.last_dispatched_at, d.inserted_at), 345 - asc: d.inserted_at 346 - ] 347 - ) 348 - 349 - query = 350 - if is_integer(limit) and limit > 0 do 351 - from(d in query, limit: ^limit) 352 - else 353 - query 354 - end 355 - 356 - query 357 - |> Repo.all() 358 - |> Repo.preload([:subscriptions, seeds: [:tags]]) 359 - end 360 - 361 - @doc """ 362 - Replay unresolved deployments for an agent. 363 - """ 364 - def replay_unresolved_deployments(%Agent{} = agent, opts \\ []) do 365 - broadcast_fun = Keyword.get(opts, :broadcast_fun, &SowerWeb.Endpoint.broadcast/3) 366 - 367 - request_id_fun = 368 - Keyword.get(opts, :request_id_fun, fn -> SowerClient.Sid.generate("request") end) 369 - 370 - now = Keyword.get(opts, :now, DateTime.utc_now()) 371 - 372 - deployments = list_unresolved_deployments_for_agent(agent) 373 - mark_deployments_dispatched(deployments, now) 374 - 375 - Enum.each(deployments, fn deployment -> 376 - payload = deployment_event_payload(deployment, request_id_fun.()) 377 - broadcast_fun.("agent:#{agent.sid}", "deployment", payload) 378 - end) 379 - 380 - if deployments != [] do 381 - Logger.info( 382 - msg: "Replayed unresolved deployments", 383 - agent_sid: agent.sid, 384 - deployment_count: length(deployments), 385 - deployment_sids: Enum.map(deployments, & &1.sid) 386 - ) 387 - end 388 - 389 - {:ok, deployments} 390 - end 391 - 392 - @doc """ 393 - Returns the list of subscriptions. 394 - 395 - ## Examples 396 - 397 - iex> list_subscriptions() 398 - [%Subscription{}, ...] 399 - 400 - """ 401 - def list_subscriptions do 402 - Repo.all(Subscription) 403 - |> Sower.Repo.preload([:agent]) 404 - end 405 - 406 - @doc """ 407 - Returns the list of subscriptions for a given agent. 408 - """ 409 - def list_subscriptions_for_agent(%Agent{} = agent) do 410 - import Ecto.Query 411 - 412 - Subscription 413 - |> where([s], s.agent_id == ^agent.id) 414 - |> Repo.all() 415 - end 416 - 417 - @doc """ 418 - Gets a single subscription. 419 - 420 - Raises `Ecto.NoResultsError` if the Subscription does not exist. 421 - 422 - ## Examples 423 - 424 - iex> get_subscription!(123) 425 - %Subscription{} 426 - 427 - iex> get_subscription!(456) 428 - ** (Ecto.NoResultsError) 429 - 430 - """ 431 - def get_subscription!(id) do 432 - Repo.get!(Subscription, id) 433 - |> Sower.Repo.preload(:agent) 434 - end 435 - 436 - @doc """ 437 - Gets a single subscription by sid. 438 - 439 - Raises `Ecto.NoResultsError` if the Subscription does not exist. 440 - 441 - ## Examples 442 - 443 - iex> get_subscription_sid!(123) 444 - %Subscription{} 445 - 446 - iex> get_subscription_sid!(456) 447 - ** (Ecto.NoResultsError) 448 - 449 - """ 450 - def get_subscription_sid!(sid), do: Repo.get_by!(Subscription, sid: sid) 451 - 452 - def get_subscription_sid(sid) do 453 - Subscription 454 - |> Repo.get_by(sid: sid) 455 - end 456 - 457 - @doc """ 458 - Gets a single subscription by sid with deployments preloaded in reverse chronological order. 459 - 460 - Raises `Ecto.NoResultsError` if the Subscription does not exist. 461 - 462 - ## Examples 463 - 464 - iex> get_subscription_sid_with_deployments!("123") 465 - %Subscription{} 466 - 467 - iex> get_subscription_sid_with_deployments!("456") 468 - ** (Ecto.NoResultsError) 469 - 470 - """ 471 - def get_subscription_sid_with_deployments!(sid) do 472 - subscription = get_subscription_sid!(sid) 473 - 474 - Repo.preload(subscription, [ 475 - :agent, 476 - deployments: 477 - from(d in Deployment, 478 - order_by: [ 479 - desc: fragment("? IS NULL", d.deployed_at), 480 - desc: d.deployed_at, 481 - desc: d.inserted_at 482 - ] 483 - ) 484 - ]) 485 - end 486 - 487 - @doc """ 488 - Gets a single subscription by sid with deployments preloaded in reverse chronological order. 489 - 490 - Returns `nil` if the Subscription does not exist. 491 - 492 - ## Examples 493 - 494 - iex> get_subscription_sid_with_deployments("123") 495 - %Subscription{} 496 - 497 - iex> get_subscription_sid_with_deployments("456") 498 - nil 499 - 500 - """ 501 - def get_subscription_sid_with_deployments(sid) do 502 - get_subscription_sid(sid) 503 - |> Repo.preload([ 504 - :agent, 505 - deployments: 506 - from(d in Deployment, 507 - order_by: [ 508 - desc: fragment("? IS NULL", d.deployed_at), 509 - desc: d.deployed_at, 510 - desc: d.inserted_at 511 - ] 512 - ) 513 - ]) 514 - end 515 - 516 - def get_subscription_sids(sids) when is_list(sids) and length(sids) > 0 do 517 - query = from sub in Subscription, where: sub.sid in ^sids 518 - 519 - Repo.all(query) 520 - end 521 - 522 - def get_subscription_sids(sids) when is_list(sids) and length(sids) == 0 do 523 - {:error, :no_sids_provided} 524 - end 525 - 526 - def find_subscription(%Sower.Seed{} = seed) do 527 - rules_filter = 528 - Enum.map(seed.tags || [], fn tag -> 529 - %{key: tag.key, value: tag.value} 530 - end) 531 - 532 - from(s in Sower.Orchestration.Subscription, 533 - where: s.seed_name == ^seed.name, 534 - where: s.seed_type == ^seed.seed_type, 535 - where: 536 - fragment( 537 - """ 538 - NOT EXISTS ( 539 - SELECT 1 FROM jsonb_array_elements(?) AS r 540 - WHERE NOT EXISTS ( 541 - SELECT 1 FROM jsonb_array_elements(?) AS t 542 - WHERE t->>'key' = r->>'key' AND t->>'value' = r->>'value' 543 - ) 544 - ) 545 - """, 546 - s.rules, 547 - ^rules_filter 548 - ) 549 - ) 550 - |> Sower.Repo.all() 551 - end 552 - 553 - @doc """ 554 - Creates a subscription. 555 - 556 - ## Examples 557 - 558 - iex> create_subscription(%{field: value}) 559 - {:ok, %Subscription{}} 560 - 561 - iex> create_subscription(%{field: bad_value}) 562 - {:error, %Ecto.Changeset{}} 563 - 564 - """ 565 - def create_subscription(attrs \\ %{}) do 566 - case %Subscription{ 567 - org_id: Sower.Repo.get_org_id(), 568 - sid: SowerClient.Sid.generate("sub") 569 - } 570 - |> Subscription.changeset(attrs) 571 - |> Repo.insert( 572 - on_conflict: {:replace, [:updated_at, :rules]}, 573 - conflict_target: [:agent_id, :org_id, :seed_name, :seed_type], 574 - returning: true 575 - ) do 576 - {:ok, sub} -> {:ok, Repo.reload(sub)} 577 - err -> err 578 - end 579 - end 580 - 581 - @doc """ 582 - Register a subscription from a SowerClient.Orchestration.Subscription struct. 583 - 584 - ## Examples 585 - 586 - iex> register_subscription(req, agent_id) 587 - {:ok, %SowerClient.Orchestration.Subscription{}} 588 - 589 - iex> register_subscription(req, agent_id) 590 - {:error, %Ecto.Changeset{}} 591 - 592 - """ 593 - def register_subscription( 594 - %SowerClient.Orchestration.Subscription{ 595 - seed_name: seed_name, 596 - seed_type: seed_type, 597 - rules: rules 598 - }, 599 - agent_id 600 - ) do 601 - case create_subscription(%{ 602 - agent_id: agent_id, 603 - seed_name: seed_name, 604 - seed_type: seed_type, 605 - rules: rules 606 - }) do 607 - {:ok, subscription} -> 608 - {:ok, SowerClient.Orchestration.Subscription.cast!(subscription)} 609 - 610 - {:error, _} = error -> 611 - error 612 - end 613 - end 614 - 615 - @doc """ 616 - Sync subscriptions for an agent. Upserts all provided subscriptions 617 - and deletes any existing subscriptions not in the list. 618 - """ 619 - def sync_subscriptions(subscriptions, agent_id) do 620 - Repo.transaction(fn -> 621 - registered = 622 - Enum.map(subscriptions, fn sub -> 623 - case register_subscription(sub, agent_id) do 624 - {:ok, s} -> s 625 - {:error, reason} -> Repo.rollback(reason) 626 - end 627 - end) 628 - 629 - registered_sids = Enum.map(registered, & &1.sid) 630 - 631 - from(s in Subscription, 632 - where: s.agent_id == ^agent_id, 633 - where: s.sid not in ^registered_sids 634 - ) 635 - |> Repo.delete_all() 636 - 637 - registered 638 - end) 639 - end 640 - 641 - @doc """ 642 - Updates a subscription. 643 - 644 - ## Examples 645 - 646 - iex> update_subscription(subscription, %{field: new_value}) 647 - {:ok, %Subscription{}} 648 - 649 - iex> update_subscription(subscription, %{field: bad_value}) 650 - {:error, %Ecto.Changeset{}} 651 - 652 - """ 653 - def update_subscription(%Subscription{} = subscription, attrs) do 654 - subscription 655 - |> Subscription.changeset(attrs) 656 - |> Repo.update() 657 - end 658 - 659 - @doc """ 660 - Deletes a subscription. 661 - 662 - ## Examples 663 - 664 - iex> delete_subscription(subscription) 665 - {:ok, %Subscription{}} 666 - 667 - iex> delete_subscription(subscription) 668 - {:error, %Ecto.Changeset{}} 669 - 670 - """ 671 - def delete_subscription(%Subscription{} = subscription) do 672 - Repo.delete(subscription) 673 - end 674 - 675 - @doc """ 676 - Returns an `%Ecto.Changeset{}` for tracking subscription changes. 677 - 678 - ## Examples 679 - 680 - iex> change_subscription(subscription) 681 - %Ecto.Changeset{data: %Subscription{}} 682 - 683 - """ 684 - def change_subscription(%Subscription{} = subscription, attrs \\ %{}) do 685 - subscription 686 - |> Repo.preload(:agent) 687 - |> Subscription.changeset(attrs) 688 - end 689 - 690 - alias Sower.Seed 691 - 692 - def match_seed(%Subscription{} = subscription) do 693 - tags = 694 - Enum.map(subscription.rules || [], fn rule -> 695 - %{key: rule.key, value: rule.value} 696 - end) 697 - 698 - Seed.latest(subscription.seed_name, subscription.seed_type, tags) 699 - end 700 - 701 - def list_matching_seeds(%Subscription{} = subscription, limit \\ 10) do 702 - tags = 703 - Enum.map(subscription.rules || [], fn rule -> 704 - %{key: rule.key, value: rule.value} 705 - end) 706 - 707 - Seed.list_matching(subscription.seed_name, subscription.seed_type, tags, limit: limit) 708 - end 709 - 710 - @doc """ 711 - Returns the list of deployments. 712 - 713 - ## Examples 714 - 715 - iex> list_deployments() 716 - [%Deployment{}, ...] 717 - 718 - """ 719 - def list_deployments do 720 - query = 721 - from r in Deployment, 722 - order_by: [ 723 - desc: fragment("? IS NULL", r.deployed_at), 724 - desc: r.deployed_at, 725 - desc: r.inserted_at 726 - ] 727 - 728 - Repo.all(query) 729 - end 730 - 731 - @doc """ 732 - Gets a single deployment. 733 - 734 - Raises `Ecto.NoResultsError` if the Deployment does not exist. 735 - 736 - ## Examples 737 - 738 - iex> get_deployment!(123) 739 - %Deployment{} 740 - 741 - iex> get_deployment!(456) 742 - ** (Ecto.NoResultsError) 743 - 744 - """ 745 - def get_deployment!(id), do: Repo.get!(Deployment, id) 746 - 747 - def get_deployment_sid!(sid) do 748 - Deployment 749 - |> Repo.get_by!(sid: sid) 750 - end 751 - 752 - def get_deployment_sid(sid) do 753 - Deployment 754 - |> Repo.get_by(sid: sid) 755 - end 756 - 757 - @doc """ 758 - Creates a deployment. 759 - 760 - ## Examples 761 - 762 - iex> create_deployment(%{field: value}) 763 - {:ok, %Deployment{}} 764 - 765 - iex> create_deployment(%{field: bad_value}) 766 - {:error, %Ecto.Changeset{}} 767 - 768 - """ 769 - def create_deployment(attrs \\ %{}) do 770 - result = 771 - %Deployment{ 772 - org_id: Sower.Repo.get_org_id(), 773 - sid: SowerClient.Sid.generate("deploy") 774 - } 775 - |> Deployment.changeset(attrs) 776 - |> Repo.insert() 777 - 778 - case result do 779 - {:ok, deployment} -> 780 - DeploymentPubSub.broadcast_deployment_change(deployment, :created) 781 - 782 - {:error, _} = error -> 783 - error 784 - end 785 - end 786 - 787 - def retry_deployment(%Deployment{} = deployment, user_id) when is_integer(user_id) do 788 - Repo.transaction(fn -> 789 - user = Repo.get(User, user_id, skip_org_id: true) 790 - 791 - if is_nil(user) do 792 - Repo.rollback(:unauthorized) 793 - end 794 - 795 - deployment = 796 - from(d in Deployment, where: d.id == ^deployment.id, lock: "FOR UPDATE") 797 - |> Repo.one() 798 - |> Repo.preload([:seeds, :subscriptions]) 799 - 800 - cond do 801 - is_nil(deployment) -> 802 - Repo.rollback(:deployment_not_found) 803 - 804 - user.org_id != deployment.org_id -> 805 - Repo.rollback(:unauthorized) 806 - 807 - deployment.result not in [:success, :failure] -> 808 - Repo.rollback(:deployment_not_retryable) 809 - 810 - true -> 811 - retry_in_progress? = 812 - from(d in Deployment, 813 - where: d.parent_deployment_id == ^deployment.id and is_nil(d.result), 814 - limit: 1, 815 - select: d.id 816 - ) 817 - |> Repo.one() 818 - 819 - if retry_in_progress? do 820 - Repo.rollback(:retry_in_progress) 821 - else 822 - max_retry_ordinal = 823 - from(d in Deployment, 824 - where: d.parent_deployment_id == ^deployment.id, 825 - select: max(d.retry_ordinal) 826 - ) 827 - |> Repo.one() || 0 828 - 829 - attrs = %{ 830 - agent_id: deployment.agent_id, 831 - content_hash: deployment.content_hash, 832 - seeds: deployment.seeds, 833 - subscriptions: deployment.subscriptions, 834 - parent_deployment_id: deployment.id, 835 - retried_by_user_id: user_id, 836 - retry_ordinal: max_retry_ordinal + 1, 837 - retried_at: DateTime.utc_now(), 838 - last_dispatched_at: DateTime.utc_now() 839 - } 840 - 841 - case create_deployment(attrs) do 842 - {:ok, retry_deployment} -> 843 - retry_deployment = 844 - Repo.preload(retry_deployment, [:agent, :subscriptions, seeds: [:tags]]) 845 - 846 - request_id = SowerClient.Sid.generate("request") 847 - 848 - SowerWeb.Endpoint.broadcast( 849 - "agent:#{retry_deployment.agent.sid}", 850 - "deployment", 851 - deployment_event_payload(retry_deployment, request_id) 852 - ) 853 - 854 - retry_deployment 855 - 856 - {:error, changeset} -> 857 - Repo.rollback(changeset) 858 - end 859 - end 860 - end 861 - end) 862 - end 863 - 864 - @doc """ 865 - Updates a deployment. 866 - 867 - ## Examples 868 - 869 - iex> update_deployment(deployment, %{field: new_value}) 870 - {:ok, %Deployment{}} 871 - 872 - iex> update_deployment(deployment, %{field: bad_value}) 873 - {:error, %Ecto.Changeset{}} 874 - 875 - """ 876 - def update_deployment(%Deployment{} = deployment, attrs) do 877 - result = 878 - deployment 879 - |> Repo.preload([:seeds, :subscriptions]) 880 - |> Deployment.changeset(attrs) 881 - |> Repo.update() 882 - 883 - case result do 884 - {:ok, updated_deployment} -> 885 - DeploymentPubSub.broadcast_deployment_change(updated_deployment, :updated) 886 - 887 - {:error, _} = error -> 888 - error 889 - end 890 - end 891 - 892 - @doc """ 893 - Deletes a deployment. 894 - 895 - ## Examples 896 - 897 - iex> delete_deployment(deployment) 898 - {:ok, %Deployment{}} 899 - 900 - iex> delete_deployment(deployment) 901 - {:error, %Ecto.Changeset{}} 902 - 903 - """ 904 - def delete_deployment(%Deployment{} = deployment) do 905 - Repo.delete(deployment) 906 - end 907 - 908 - @doc """ 909 - Returns an `%Ecto.Changeset{}` for tracking deployment changes. 910 - 911 - ## Examples 912 - 913 - iex> change_deployment(deployment) 914 - %Ecto.Changeset{data: %Deployment{}} 915 - 916 - """ 917 - def change_deployment(%Deployment{} = deployment, attrs \\ %{}) do 918 - Deployment.changeset(deployment, attrs) 919 - end 920 - 921 - # Deployment Request Handling 922 - # 923 - # Entry points for deployment requests with different behaviors: 924 - # 925 - # - handle_deployment_request/2: Main entry point from agent channel 926 - # Flow: validate request → validate subscriptions → spawn async task 927 - # Task: match seeds → create deployment → broadcast to agent 928 - # 929 - # - request_deployment/2: Synchronous deployment (no broadcast) 930 - # Used for internal synchronous requests without async behavior 931 - # 932 - # - deploy_subscription/1: Deploy a single subscription 933 - # Matches subscription against available seeds, used by other functions 934 - 935 - @doc """ 936 - Initiates an async deployment for a single subscription. 937 - 938 - Looks up the subscription's agent, matches seeds, and spawns async task 939 - to create deployment and broadcast results back to agent. 940 - 941 - Returns {:ok, request_id} if async task starts successfully, 942 - {:error, reason} otherwise. 943 - 944 - ## Examples 945 - 946 - iex> deploy_subscription(subscription) 947 - {:ok, "request_abc123"} 948 - 949 - iex> deploy_subscription(subscription_with_no_agent) 950 - {:error, :agent_not_found} 951 - 952 - """ 953 - def deploy_subscription(%Subscription{} = sub, opts \\ []) do 954 - subscription = Repo.preload(sub, :agent) 955 - 956 - case subscription.agent do 957 - nil -> 958 - {:error, :agent_not_found} 959 - 960 - %Agent{} = agent -> 961 - request_id = SowerClient.Sid.generate("request") 962 - {:ok, request_id} = process_deployment(request_id, [subscription], agent, opts) 963 - {:ok, request_id} 964 - end 965 - end 966 - 967 - @doc """ 968 - Request and create a deployment for subscriptions (synchronous, no broadcast). 969 - 970 - Used internally for synchronous deployment requests without async broadcast. 971 - Validates subscriptions exist and match, then creates deployment record. 972 - 973 - Returns the structured Deployment response. 974 - 975 - ## Examples 976 - 977 - iex> request_deployment(deployment_request) 978 - {:ok, %SowerClient.Orchestration.Deployment{}} 979 - 980 - iex> request_deployment(invalid_request) 981 - {:error, :subscription_not_found} 982 - 983 - """ 984 - def request_deployment(%SowerClient.Orchestration.DeploymentRequest{} = request) do 985 - with {:ok, subs} <- validate_request_subscriptions(request.subscription_sids) do 986 - do_deployment(request.request_id, subs, force: request.force) 987 - else 988 - {:error, _} = err -> 989 - Logger.error(msg: "Failed to process deployment request", error: IO.inspect(err)) 990 - {:error, :unknown_error} 991 - end 992 - end 993 - 994 - defp validate_request_subscriptions(sids) when is_list(sids) and length(sids) > 0 do 995 - subs = get_subscription_sids(sids) 996 - subs = Repo.preload(subs, :agent) 997 - 998 - if subs == [] do 999 - {:error, :subscription_not_found} 1000 - else 1001 - {:ok, subs} 1002 - end 1003 - end 1004 - 1005 - defp validate_request_subscriptions(_), do: {:error, :subscription_not_found} 1006 - 1007 - @doc """ 1008 - Handle a deployment request from an agent channel. 1009 - 1010 - Validates the deployment request and subscriptions synchronously, 1011 - then delegates to process_deployment/3 for async processing and broadcasting. 1012 - 1013 - Returns {:ok, request_id} on successful validation, {:error, reason} otherwise. 1014 - """ 1015 - def handle_deployment_request(payload, agent) do 1016 - with {:ok, request} <- SowerClient.Orchestration.DeploymentRequest.cast(payload), 1017 - {:ok, subscriptions} <- validate_deployment_request(request, agent.id), 1018 - {:ok, request_id} <- 1019 - process_deployment(request.request_id, subscriptions, agent, force: request.force) do 1020 - {:ok, request_id} 1021 - end 1022 - end 1023 - 1024 - defp validate_deployment_request( 1025 - %SowerClient.Orchestration.DeploymentRequest{} = request, 1026 - agent_id 1027 - ) do 1028 - subs = get_subscription_sids(request.subscription_sids) 1029 - 1030 - cond do 1031 - subs == [] -> 1032 - {:error, :subscription_not_found} 1033 - 1034 - not Enum.all?(subs, &(&1.agent_id == agent_id)) -> 1035 - {:error, :unauthorized} 1036 - 1037 - true -> 1038 - {:ok, Repo.preload(subs, :agent)} 1039 - end 1040 - end 1041 - 1042 - @doc """ 1043 - Process a deployment request asynchronously. 1044 - 1045 - Spawns a fire-and-forget task to match seeds, create deployment record, and 1046 - broadcast results back to agent via channel. Validation happens synchronously 1047 - before task spawn. 1048 - 1049 - Returns {:ok, request_id} on success. 1050 - Returns {:error, reason} if validation fails. 1051 - """ 1052 - def process_deployment(request_id, subscriptions, %Agent{} = agent, opts \\ []) do 1053 - Task.Supervisor.start_child(Sower.TaskSupervisor, fn -> 1054 - Repo.put_org_id(agent.org_id) 1055 - 1056 - Logger.info( 1057 - msg: "Deployment processing started", 1058 - request_id: request_id, 1059 - agent_id: agent.id 1060 - ) 9 + alias Sower.Orchestration.AgentSeedGeneration 1061 10 1062 - case do_deployment(request_id, subscriptions, opts) do 1063 - {:ok, deployment} -> 1064 - Logger.info( 1065 - msg: "Deployment broadcast successful", 1066 - request_id: request_id, 1067 - deployment_sid: deployment.sid, 1068 - skipped: deployment.skipped 1069 - ) 11 + # Agent delegates 12 + defdelegate list_agents(), to: Agent 13 + defdelegate list_agents_with_latest_deployment(), to: Agent 14 + defdelegate get_agent(hello, socket), to: Agent 15 + defdelegate get_agent!(id), to: Agent 16 + defdelegate get_agent_sid!(sid), to: Agent 17 + defdelegate get_agent_sid(sid), to: Agent 18 + defdelegate get_agent_local_sid(local_sid), to: Agent 19 + defdelegate get_agent_local_sid!(local_sid), to: Agent 20 + defdelegate create_agent(attrs \\ %{}), to: Agent 21 + defdelegate update_agent(agent, attrs), to: Agent 22 + defdelegate delete_agent(agent), to: Agent 23 + defdelegate change_agent(agent, attrs \\ %{}), to: Agent 1070 24 1071 - SowerWeb.Endpoint.broadcast( 1072 - "agent:#{agent.sid}", 1073 - "deployment", 1074 - Map.from_struct(deployment) 1075 - ) 25 + # Subscription delegates 26 + defdelegate list_subscriptions(), to: Subscription 27 + defdelegate list_subscriptions_for_agent(agent), to: Subscription 28 + defdelegate get_subscription!(id), to: Subscription 29 + defdelegate get_subscription_sid!(sid), to: Subscription 30 + defdelegate get_subscription_sid(sid), to: Subscription 31 + defdelegate get_subscription_sid_with_deployments!(sid), to: Subscription 32 + defdelegate get_subscription_sid_with_deployments(sid), to: Subscription 33 + defdelegate get_subscription_sids(sids), to: Subscription 34 + defdelegate find_subscription(seed), to: Subscription 35 + defdelegate create_subscription(attrs \\ %{}), to: Subscription 36 + defdelegate register_subscription(req, agent_id), to: Subscription 37 + defdelegate sync_subscriptions(subscriptions, agent_id), to: Subscription 38 + defdelegate update_subscription(subscription, attrs), to: Subscription 39 + defdelegate delete_subscription(subscription), to: Subscription 40 + defdelegate change_subscription(subscription, attrs \\ %{}), to: Subscription 1076 41 1077 - {:error, reason} -> 1078 - Logger.error( 1079 - msg: "Deployment processing failed", 1080 - request_id: request_id, 1081 - reason: to_string(reason) 1082 - ) 42 + # Deployment delegates 43 + defdelegate list_deployments(), to: Deployment 44 + defdelegate list_deployments(agent, opts \\ []), to: Deployment 45 + defdelegate list_unresolved_deployments_for_agent(agent, opts \\ []), to: Deployment 46 + defdelegate get_deployment!(id), to: Deployment 47 + defdelegate get_deployment_sid!(sid), to: Deployment 48 + defdelegate get_deployment_sid(sid), to: Deployment 49 + defdelegate create_deployment(attrs \\ %{}), to: Deployment 50 + defdelegate update_deployment(deployment, attrs), to: Deployment 51 + defdelegate delete_deployment(deployment), to: Deployment 52 + defdelegate change_deployment(deployment, attrs \\ %{}), to: Deployment 53 + defdelegate retry_deployment(deployment, user_id), to: Deployment 54 + defdelegate replay_unresolved_deployments(agent, opts \\ []), to: Deployment 55 + defdelegate match_seed(subscription), to: Deployment 56 + defdelegate list_matching_seeds(subscription, limit \\ 10), to: Deployment 57 + defdelegate deploy_subscription(subscription, opts \\ []), to: Deployment 58 + defdelegate request_deployment(request), to: Deployment 59 + defdelegate handle_deployment_request(payload, agent), to: Deployment 60 + defdelegate process_deployment(request_id, subscriptions, agent, opts \\ []), to: Deployment 61 + defdelegate record_deployment(result), to: Deployment 62 + defdelegate finalize_stale_deployments(opts \\ []), to: Deployment 1083 63 1084 - SowerWeb.Endpoint.broadcast( 1085 - "agent:#{agent.sid}", 1086 - "deployment:error", 1087 - %{request_id: request_id, reason: to_string(reason)} 1088 - ) 1089 - end 1090 - end) 64 + # AgentSeedGeneration delegates 65 + defdelegate list_agent_seed_generation(agent), to: AgentSeedGeneration 66 + defdelegate list_current_seed_generation(agent), to: AgentSeedGeneration 67 + defdelegate list_agent_seed_generation_profile(agent_id, profile_id), to: AgentSeedGeneration 1091 68 1092 - {:ok, request_id} 1093 - end 69 + defdelegate upsert_agent_generation(agent_id, profile_id, seed_id, attrs), 70 + to: AgentSeedGeneration 1094 71 1095 - defp do_deployment(request_id, subscriptions, opts) do 1096 - force? = Keyword.get(opts, :force, false) 1097 - agent_id = hd(subscriptions).agent_id 1098 - 1099 - seed_deploys = 1100 - subscriptions 1101 - |> Enum.reduce([], fn sub, acc -> 1102 - case match_seed(sub) do 1103 - nil -> 1104 - acc 1105 - 1106 - seed -> 1107 - [ 1108 - %SowerClient.Orchestration.SeedDeployment{ 1109 - seed: seed, 1110 - subscription_sid: sub.sid 1111 - } 1112 - | acc 1113 - ] 1114 - end 1115 - end) 1116 - 1117 - seeds = Enum.map(seed_deploys, & &1.seed) 1118 - 1119 - if seeds == [] do 1120 - Logger.warning( 1121 - msg: "No matching seeds found for deployment request", 1122 - request_id: request_id, 1123 - subscription_count: length(subscriptions) 1124 - ) 1125 - 1126 - {:error, :seeds_not_found} 1127 - else 1128 - content_hash = compute_content_hash(seeds) 1129 - 1130 - Logger.debug( 1131 - msg: "Processing deployment with matched seeds", 1132 - request_id: request_id, 1133 - seed_count: length(seeds), 1134 - content_hash: content_hash 1135 - ) 1136 - 1137 - case find_duplicate_deployment(agent_id, content_hash, force?) do 1138 - {:skip, existing} -> 1139 - existing = Repo.preload(existing, [:seeds]) 1140 - 1141 - Logger.info( 1142 - msg: "Skipping deployment - duplicate found", 1143 - request_id: request_id, 1144 - deployment_sid: existing.sid, 1145 - content_hash: content_hash 1146 - ) 1147 - 1148 - {:ok, 1149 - %SowerClient.Orchestration.Deployment{ 1150 - request_id: request_id, 1151 - sid: existing.sid, 1152 - seed_deployments: seed_deploys, 1153 - skipped: true 1154 - }} 1155 - 1156 - :proceed -> 1157 - Logger.debug( 1158 - msg: "Creating new deployment record", 1159 - request_id: request_id, 1160 - agent_id: agent_id 1161 - ) 1162 - 1163 - case create_deployment(%{ 1164 - agent_id: agent_id, 1165 - content_hash: content_hash, 1166 - last_dispatched_at: DateTime.utc_now(), 1167 - seeds: seeds, 1168 - subscriptions: subscriptions 1169 - }) do 1170 - {:ok, deploy} -> 1171 - Logger.info( 1172 - msg: "Deployment record created successfully", 1173 - request_id: request_id, 1174 - deployment_sid: deploy.sid 1175 - ) 1176 - 1177 - {:ok, 1178 - %SowerClient.Orchestration.Deployment{ 1179 - request_id: request_id, 1180 - sid: deploy.sid, 1181 - seed_deployments: seed_deploys, 1182 - skipped: false 1183 - }} 1184 - 1185 - {:error, reason} -> 1186 - Logger.error( 1187 - msg: "Failed to create deployment record", 1188 - request_id: request_id, 1189 - reason: inspect(reason) 1190 - ) 1191 - 1192 - {:error, reason} 1193 - end 1194 - end 1195 - end 1196 - end 1197 - 1198 - defp compute_content_hash(seeds) do 1199 - seeds 1200 - |> Enum.map(& &1.id) 1201 - |> Enum.sort() 1202 - |> Enum.join(":") 1203 - |> then(&:crypto.hash(:sha256, &1)) 1204 - |> Base.encode16(case: :lower) 1205 - end 1206 - 1207 - defp find_duplicate_deployment(_agent_id, _content_hash, true), do: :proceed 1208 - 1209 - defp find_duplicate_deployment(agent_id, content_hash, false) do 1210 - query = 1211 - from(d in Deployment, 1212 - where: 1213 - d.agent_id == ^agent_id and 1214 - d.content_hash == ^content_hash and 1215 - (d.result == :success or is_nil(d.result)), 1216 - order_by: [desc: d.inserted_at], 1217 - limit: 1 1218 - ) 1219 - 1220 - case Repo.one(query) do 1221 - nil -> :proceed 1222 - deployment -> {:skip, deployment} 1223 - end 1224 - end 1225 - 1226 - def record_deployment(%SowerClient.Orchestration.DeploymentResult{} = result) do 1227 - case get_deployment_sid(result.deployment_sid) do 1228 - nil -> 1229 - {:error, :deployment_not_found} 1230 - 1231 - deploy -> 1232 - update_deployment(deploy, %{deployed_at: result.deployed_at, result: result.result}) 1233 - end 1234 - end 1235 - 1236 - @doc """ 1237 - Finalize stale unresolved deployments. 1238 - """ 1239 - def finalize_stale_deployments(opts \\ []) do 1240 - now = Keyword.get(opts, :now, DateTime.utc_now()) 1241 - stale_after_seconds = Keyword.get(opts, :stale_after_seconds, stale_after_seconds()) 1242 - batch_size = Keyword.get(opts, :batch_size, stale_batch_size()) 1243 - 1244 - if stale_after_seconds <= 0 or batch_size <= 0 do 1245 - {:ok, 0} 1246 - else 1247 - cutoff = DateTime.add(now, -stale_after_seconds, :second) 1248 - 1249 - stale_deployments = 1250 - from(d in Deployment, 1251 - where: is_nil(d.result), 1252 - where: fragment("COALESCE(?, ?) <= ?", d.last_dispatched_at, d.inserted_at, ^cutoff), 1253 - order_by: [ 1254 - asc: fragment("COALESCE(?, ?)", d.last_dispatched_at, d.inserted_at), 1255 - asc: d.inserted_at 1256 - ], 1257 - limit: ^batch_size 1258 - ) 1259 - |> Repo.all(skip_org_id: true) 1260 - 1261 - finalized = 1262 - Enum.reduce(stale_deployments, 0, fn deployment, acc -> 1263 - case finalize_stale_deployment(deployment, now) do 1264 - {:ok, _} -> acc + 1 1265 - _ -> acc 1266 - end 1267 - end) 1268 - 1269 - if finalized > 0 do 1270 - Logger.info( 1271 - msg: "Finalized stale deployments", 1272 - stale_after_seconds: stale_after_seconds, 1273 - batch_size: batch_size, 1274 - finalized_count: finalized 1275 - ) 1276 - end 1277 - 1278 - {:ok, finalized} 1279 - end 1280 - end 1281 - 1282 - defp deployment_event_payload(%Deployment{} = deployment, request_id) do 1283 - %SowerClient.Orchestration.Deployment{ 1284 - request_id: request_id, 1285 - sid: deployment.sid, 1286 - seed_deployments: build_seed_deployments(deployment.seeds, deployment.subscriptions), 1287 - skipped: false 1288 - } 1289 - end 1290 - 1291 - defp build_seed_deployments(seeds, subscriptions) do 1292 - Enum.map(seeds, fn seed -> 1293 - subscription_sid = 1294 - subscriptions 1295 - |> Enum.find(fn sub -> 1296 - sub.seed_name == seed.name and sub.seed_type == seed.seed_type 1297 - end) 1298 - |> case do 1299 - nil -> nil 1300 - sub -> sub.sid 1301 - end 1302 - 1303 - %SowerClient.Orchestration.SeedDeployment{ 1304 - seed: seed, 1305 - subscription_sid: subscription_sid 1306 - } 1307 - end) 1308 - end 1309 - 1310 - defp mark_deployments_dispatched([], _dispatched_at), do: :ok 1311 - 1312 - defp mark_deployments_dispatched(deployments, dispatched_at) do 1313 - ids = Enum.map(deployments, & &1.id) 1314 - now = DateTime.utc_now() 1315 - 1316 - from(d in Deployment, 1317 - where: d.id in ^ids and is_nil(d.result) 1318 - ) 1319 - |> Repo.update_all(set: [last_dispatched_at: dispatched_at, updated_at: now]) 1320 - 1321 - :ok 1322 - end 1323 - 1324 - defp finalize_stale_deployment(%Deployment{} = deployment, now) do 1325 - previous_org_id = Repo.get_org_id() 1326 - Repo.put_org_id(deployment.org_id) 1327 - 1328 - result = 1329 - case Repo.get(Deployment, deployment.id) do 1330 - nil -> 1331 - :ignore 1332 - 1333 - %Deployment{result: nil} = unresolved -> 1334 - update_deployment(unresolved, %{deployed_at: now, result: :failure}) 1335 - 1336 - %Deployment{} -> 1337 - :ignore 1338 - end 1339 - 1340 - Repo.put_org_id(previous_org_id) 1341 - 1342 - result 1343 - end 1344 - 1345 - defp stale_after_seconds do 1346 - config = Application.get_env(:sower, __MODULE__, []) 1347 - Keyword.get(config, :stale_after_seconds, @default_stale_after_seconds) 1348 - end 1349 - 1350 - defp stale_batch_size do 1351 - config = Application.get_env(:sower, __MODULE__, []) 1352 - Keyword.get(config, :stale_batch_size, @default_stale_batch_size) 1353 - end 1354 - 1355 - alias Sower.Orchestration.{NixProfile, AgentSeedGeneration} 1356 - 1357 - @doc """ 1358 - Lists all agent_seed_generations for an agent, ordered by generation_number descending. 1359 - Preloads seed and profile associations. 1360 - """ 1361 - def list_agent_seed_generation(%Agent{id: agent_id}) do 1362 - from(asg in AgentSeedGeneration, 1363 - where: asg.agent_id == ^agent_id, 1364 - order_by: [desc: asg.generation_number], 1365 - preload: [:seed, :profile] 1366 - ) 1367 - |> Repo.all() 1368 - end 1369 - 1370 - @doc """ 1371 - Lists only the current (active) generations for an agent. 1372 - Preloads seed and profile associations. 1373 - """ 1374 - def list_current_seed_generation(%Agent{id: agent_id}) do 1375 - from(asg in AgentSeedGeneration, 1376 - where: asg.agent_id == ^agent_id and asg.is_current == true, 1377 - preload: [:seed, :profile] 1378 - ) 1379 - |> Repo.all() 1380 - end 1381 - 1382 - @doc """ 1383 - Lists all generations for a specific agent and profile. 1384 - Ordered by generation_number descending. 1385 - """ 1386 - def list_agent_seed_generation_profile(agent_id, profile_id) do 1387 - from(asg in AgentSeedGeneration, 1388 - where: asg.agent_id == ^agent_id and asg.profile_id == ^profile_id, 1389 - order_by: [desc: asg.generation_number], 1390 - preload: [:seed, :profile] 1391 - ) 1392 - |> Repo.all() 1393 - end 1394 - 1395 - @doc """ 1396 - Upserts an agent_seed_generation from report data. 1397 - Uses lookup semantics on (agent_id, seed_id), inserting missing rows and 1398 - updating existing rows. 1399 - 1400 - ## Parameters 1401 - - agent_id: The agent's ID 1402 - - profile_id: The nix_profile's ID 1403 - - seed_id: The seed's ID 1404 - - attrs: Map with :generation_number, :is_current, :created_at_generation 1405 - """ 1406 - def upsert_agent_generation(agent_id, profile_id, seed_id, attrs) do 1407 - now = DateTime.utc_now() 1408 - 1409 - changeset_attrs = %{ 1410 - org_id: Repo.get_org_id(), 1411 - agent_id: agent_id, 1412 - seed_id: seed_id, 1413 - profile_id: profile_id, 1414 - generation_number: attrs.generation_number, 1415 - is_current: attrs.is_current, 1416 - created_at_generation: attrs.created_at_generation 1417 - } 1418 - 1419 - case Repo.get_by(AgentSeedGeneration, agent_id: agent_id, seed_id: seed_id) do 1420 - nil -> 1421 - %AgentSeedGeneration{} 1422 - |> AgentSeedGeneration.changeset(changeset_attrs) 1423 - |> Repo.insert() 1424 - 1425 - %AgentSeedGeneration{} = existing -> 1426 - update_attrs = %{ 1427 - profile_id: profile_id, 1428 - generation_number: attrs.generation_number, 1429 - is_current: attrs.is_current, 1430 - created_at_generation: attrs.created_at_generation, 1431 - updated_at: now 1432 - } 1433 - 1434 - if generation_row_changed?(existing, update_attrs) do 1435 - existing 1436 - |> AgentSeedGeneration.changeset(update_attrs) 1437 - |> Repo.update() 1438 - else 1439 - {:ok, existing} 1440 - end 1441 - end 1442 - end 1443 - 1444 - @doc """ 1445 - Updates agent_seed_generations from an agent's seeds report. 1446 - 1447 - For each profile in the report: 1448 - - Finds or creates the nix_profile by path 1449 - - For each generation, looks up the seed by artifact (store path) 1450 - - Upserts agent_seed_generation records for found seeds 1451 - - Deletes agent_seed_generations for seeds no longer reported by the agent 1452 - 1453 - Unknown artifacts are automatically registered as seeds with `agent_source` tag. 1454 - """ 1455 - def update_agent_seed_generations( 1456 - %SowerClient.Orchestration.AgentSeedsReport{} = report, 1457 - %Agent{} = agent 1458 - ) do 1459 - Repo.transaction(fn -> 1460 - if Enum.empty?(report.profiles) do 1461 - # Empty report means agent has no subscriptions - delete all generations 1462 - delete_all_agent_seed_generations(agent.id) 1463 - else 1464 - for profile <- report.profiles do 1465 - nix_profile = NixProfile.find_or_create!(profile.profile_path) 1466 - rows = resolve_profile_generation_rows(agent, profile) 1467 - sync_profile_generation_rows(agent, nix_profile, rows) 1468 - end 1469 - end 1470 - 1471 - :ok 1472 - end) 1473 - end 1474 - 1475 - defp resolve_profile_generation_rows(%Agent{} = agent, profile) do 1476 - artifacts = 1477 - profile.generations 1478 - |> Enum.map(& &1.path) 1479 - |> Enum.uniq() 1480 - 1481 - seeds_by_artifact = 1482 - from(s in Seed, where: s.artifact in ^artifacts) 1483 - |> Repo.all() 1484 - |> Map.new(&{&1.artifact, &1}) 1485 - 1486 - {rows, _seeds_by_artifact} = 1487 - Enum.reduce(profile.generations, {[], seeds_by_artifact}, fn gen, {rows, seeds} -> 1488 - {seed, seeds} = 1489 - case Map.get(seeds, gen.path) do 1490 - nil -> 1491 - case Seed.find_or_register(agent, gen, profile) do 1492 - {:ok, seed} -> 1493 - {seed, Map.put(seeds, gen.path, seed)} 1494 - 1495 - {:error, error} -> 1496 - Logger.warning( 1497 - msg: "Failed to auto-register seed from agent", 1498 - artifact: gen.path, 1499 - error: error 1500 - ) 1501 - 1502 - {nil, seeds} 1503 - end 1504 - 1505 - seed -> 1506 - {seed, seeds} 1507 - end 1508 - 1509 - case {seed, parse_generation_created(gen.created)} do 1510 - {nil, _} -> 1511 - {rows, seeds} 1512 - 1513 - {_, :error} -> 1514 - Logger.warning( 1515 - msg: "Failed to parse generation created timestamp", 1516 - artifact: gen.path, 1517 - created: gen.created 1518 - ) 1519 - 1520 - {rows, seeds} 1521 - 1522 - {%Seed{id: seed_id}, {:ok, created_at}} -> 1523 - row = %{ 1524 - seed_id: seed_id, 1525 - generation_number: gen.generation_number, 1526 - is_current: gen.is_current, 1527 - created_at_generation: created_at 1528 - } 1529 - 1530 - {[row | rows], seeds} 1531 - end 1532 - end) 1533 - 1534 - rows 1535 - |> Enum.reverse() 1536 - |> normalize_current_generation_rows() 1537 - |> Enum.reverse() 1538 - |> Enum.uniq_by(& &1.seed_id) 1539 - |> Enum.reverse() 1540 - end 1541 - 1542 - defp parse_generation_created(%DateTime{} = dt), do: {:ok, DateTime.truncate(dt, :second)} 1543 - 1544 - defp parse_generation_created(str) when is_binary(str) do 1545 - case DateTime.from_iso8601(str) do 1546 - {:ok, dt, _offset} -> {:ok, DateTime.truncate(dt, :second)} 1547 - _ -> :error 1548 - end 1549 - end 1550 - 1551 - defp parse_generation_created(_), do: :error 1552 - 1553 - defp normalize_current_generation_rows(rows) do 1554 - current_seed_id = 1555 - Enum.reduce(rows, nil, fn row, acc -> if row.is_current, do: row.seed_id, else: acc end) 1556 - 1557 - if is_nil(current_seed_id) do 1558 - rows 1559 - else 1560 - Enum.map(rows, fn row -> 1561 - %{row | is_current: row.seed_id == current_seed_id} 1562 - end) 1563 - end 1564 - end 1565 - 1566 - defp sync_profile_generation_rows(%Agent{} = agent, nix_profile, rows) do 1567 - if Enum.any?(rows, & &1.is_current) do 1568 - from(asg in AgentSeedGeneration, 1569 - where: asg.agent_id == ^agent.id and asg.profile_id == ^nix_profile.id 1570 - ) 1571 - |> Repo.update_all(set: [is_current: false]) 1572 - end 1573 - 1574 - keep_seed_ids = 1575 - Enum.reduce(rows, [], fn row, acc -> 1576 - upsert_agent_generation(agent.id, nix_profile.id, row.seed_id, row) 1577 - [row.seed_id | acc] 1578 - end) 1579 - |> Enum.uniq() 1580 - 1581 - delete_stale_agent_seed_generations(agent.id, nix_profile.id, keep_seed_ids) 1582 - end 1583 - 1584 - defp generation_row_changed?(existing, attrs) do 1585 - existing.profile_id != attrs.profile_id or 1586 - existing.generation_number != attrs.generation_number or 1587 - existing.is_current != attrs.is_current or 1588 - existing.created_at_generation != attrs.created_at_generation 1589 - end 1590 - 1591 - defp delete_stale_agent_seed_generations(agent_id, profile_id, keep_seed_ids) do 1592 - query = 1593 - from(asg in AgentSeedGeneration, 1594 - where: asg.agent_id == ^agent_id and asg.profile_id == ^profile_id 1595 - ) 1596 - 1597 - query = 1598 - if keep_seed_ids == [] do 1599 - query 1600 - else 1601 - from(asg in query, where: asg.seed_id not in ^keep_seed_ids) 1602 - end 1603 - 1604 - Repo.delete_all(query) 1605 - end 1606 - 1607 - defp delete_all_agent_seed_generations(agent_id) do 1608 - from(asg in AgentSeedGeneration, where: asg.agent_id == ^agent_id) 1609 - |> Repo.delete_all() 1610 - end 72 + defdelegate update_agent_seed_generations(report, agent), to: AgentSeedGeneration 1611 73 end
+166
apps/sower/lib/sower/orchestration/agent.ex
··· 1 1 defmodule Sower.Orchestration.Agent do 2 2 use Ecto.Schema 3 3 import Ecto.Changeset 4 + import Ecto.Query, warn: false 5 + import Sower.Authorization 6 + 7 + alias Sower.Repo 8 + alias Sower.Orchestration.Deployment 9 + 10 + require Logger 4 11 5 12 @derive {Jason.Encoder, only: [:sid, :local_sid]} 6 13 @derive {Phoenix.Param, key: :sid} ··· 25 32 agent 26 33 |> cast(attrs, [:name, :org_id, :local_sid]) 27 34 |> validate_required([:name]) 35 + end 36 + 37 + def list_agents do 38 + Repo.all(__MODULE__) 39 + end 40 + 41 + def list_agents_with_latest_deployment do 42 + latest_deployment_query = 43 + from(d in Deployment, 44 + where: d.agent_id == parent_as(:agent).id, 45 + order_by: [desc: d.inserted_at], 46 + limit: 1 47 + ) 48 + 49 + from(a in __MODULE__, 50 + as: :agent, 51 + left_lateral_join: d in subquery(latest_deployment_query), 52 + on: true, 53 + select: %{a | latest_deployment: d} 54 + ) 55 + |> Repo.all() 56 + end 57 + 58 + def get_agent( 59 + %SowerClient.AgentHello{agent_sid: nil, name: name, local_sid: local_sid}, 60 + socket 61 + ) do 62 + case get_agent_local_sid(local_sid) do 63 + nil -> 64 + Logger.debug( 65 + msg: "Registering new agent", 66 + name: name, 67 + local_sid: local_sid 68 + ) 69 + 70 + if socket.assigns.access_token |> can() |> create?(__MODULE__) do 71 + create_agent(%{name: name, local_sid: local_sid}) 72 + else 73 + {:error, :unauthorized} 74 + end 75 + 76 + %__MODULE__{} = agent -> 77 + Logger.error( 78 + msg: "Local agent attempted to re-register existing agent", 79 + name: agent.name, 80 + local_sid: local_sid, 81 + existing_agent_sid: agent.sid 82 + ) 83 + 84 + {:error, :unauthorized_agent_hello} 85 + end 86 + end 87 + 88 + def get_agent( 89 + %SowerClient.AgentHello{agent_sid: agent_sid, name: name, local_sid: local_sid}, 90 + socket 91 + ) do 92 + case get_agent_sid(agent_sid) do 93 + nil -> 94 + Logger.debug( 95 + msg: "Local agent requested a missing agent", 96 + name: name, 97 + local_sid: local_sid, 98 + requested_agent_sid: agent_sid 99 + ) 100 + 101 + if socket.assigns.access_token |> can() |> create?(__MODULE__) do 102 + create_agent(%{name: name, local_sid: local_sid}) 103 + else 104 + {:error, :unauthorized} 105 + end 106 + 107 + %__MODULE__{local_sid: nil} = agent when agent.name == name -> 108 + Logger.debug( 109 + msg: "Registering local sid to existing agent", 110 + name: agent.name, 111 + local_sid: local_sid, 112 + agent_sid: agent.sid 113 + ) 114 + 115 + if socket.assigns.access_token |> can() |> create?(__MODULE__) do 116 + agent = update_agent(agent, %{local_sid: local_sid}) 117 + 118 + {:ok, agent} 119 + else 120 + {:error, :unauthorized_agent_hello} 121 + end 122 + 123 + %__MODULE__{} = agent 124 + when agent.sid == agent_sid and 125 + agent.name == name and 126 + agent.local_sid == local_sid -> 127 + Logger.debug( 128 + msg: "Found matching agent", 129 + name: agent.name, 130 + local_sid: local_sid, 131 + agent_sid: agent.sid 132 + ) 133 + 134 + {:ok, agent} 135 + 136 + %__MODULE__{} = agent 137 + when agent.sid == agent_sid and 138 + agent.name != name and 139 + agent.local_sid == local_sid -> 140 + Logger.info( 141 + msg: "Found matching agent with different name, renaming", 142 + name: name, 143 + previous_name: agent.name, 144 + local_sid: local_sid, 145 + agent_sid: agent.sid 146 + ) 147 + 148 + {:ok, agent} = update_agent(agent, %{name: name}) 149 + 150 + {:ok, agent} 151 + 152 + %__MODULE__{} = agent -> 153 + Logger.error( 154 + msg: "Invalid agent request", 155 + local_sid: local_sid, 156 + agent_sid: agent.sid 157 + ) 158 + 159 + {:error, :unauthorized_agent_hello} 160 + end 161 + end 162 + 163 + def get_agent!(id), do: Repo.get!(__MODULE__, id) 164 + 165 + def get_agent_sid!(sid), do: Repo.get_by!(__MODULE__, sid: sid) 166 + 167 + def get_agent_sid(sid), do: Repo.get_by(__MODULE__, sid: sid) 168 + 169 + def get_agent_local_sid(local_sid), do: Repo.get_by(__MODULE__, local_sid: local_sid) 170 + 171 + def get_agent_local_sid!(local_sid), do: Repo.get_by!(__MODULE__, local_sid: local_sid) 172 + 173 + def create_agent(attrs \\ %{}) do 174 + %__MODULE__{ 175 + org_id: Sower.Repo.get_org_id(), 176 + sid: SowerClient.Sid.generate("agent") 177 + } 178 + |> changeset(attrs) 179 + |> Repo.insert() 180 + end 181 + 182 + def update_agent(%__MODULE__{} = agent, attrs) do 183 + agent 184 + |> changeset(attrs) 185 + |> Repo.update() 186 + end 187 + 188 + def delete_agent(%__MODULE__{} = agent) do 189 + Repo.delete(agent) 190 + end 191 + 192 + def change_agent(%__MODULE__{} = agent, attrs \\ %{}) do 193 + changeset(agent, attrs) 28 194 end 29 195 end
+226 -2
apps/sower/lib/sower/orchestration/agent_seed_generation.ex
··· 1 1 defmodule Sower.Orchestration.AgentSeedGeneration do 2 2 use Sower.Schema 3 3 import Ecto.Changeset 4 + import Ecto.Query, warn: false 4 5 5 - alias Sower.Orchestration.{Agent, NixProfile} 6 + alias Sower.Repo 7 + alias Sower.Orchestration.{Agent, NixProfile, Seed} 8 + 9 + require Logger 6 10 7 11 schema "agent_seed_generations" do 8 12 field :org_id, Ecto.UUID 9 13 10 14 belongs_to :agent, Agent 11 - belongs_to :seed, Sower.Seed 15 + belongs_to :seed, Seed 12 16 belongs_to :profile, NixProfile 13 17 14 18 field :generation_number, :integer ··· 36 40 |> foreign_key_constraint(:seed_id) 37 41 |> foreign_key_constraint(:profile_id) 38 42 |> unique_constraint([:agent_id, :seed_id]) 43 + end 44 + 45 + def list_agent_seed_generation(%Agent{id: agent_id}) do 46 + from(asg in __MODULE__, 47 + where: asg.agent_id == ^agent_id, 48 + order_by: [desc: asg.generation_number], 49 + preload: [:seed, :profile] 50 + ) 51 + |> Repo.all() 52 + end 53 + 54 + def list_current_seed_generation(%Agent{id: agent_id}) do 55 + from(asg in __MODULE__, 56 + where: asg.agent_id == ^agent_id and asg.is_current == true, 57 + preload: [:seed, :profile] 58 + ) 59 + |> Repo.all() 60 + end 61 + 62 + def list_agent_seed_generation_profile(agent_id, profile_id) do 63 + from(asg in __MODULE__, 64 + where: asg.agent_id == ^agent_id and asg.profile_id == ^profile_id, 65 + order_by: [desc: asg.generation_number], 66 + preload: [:seed, :profile] 67 + ) 68 + |> Repo.all() 69 + end 70 + 71 + def upsert_agent_generation(agent_id, profile_id, seed_id, attrs) do 72 + now = DateTime.utc_now() 73 + 74 + changeset_attrs = %{ 75 + org_id: Repo.get_org_id(), 76 + agent_id: agent_id, 77 + seed_id: seed_id, 78 + profile_id: profile_id, 79 + generation_number: attrs.generation_number, 80 + is_current: attrs.is_current, 81 + created_at_generation: attrs.created_at_generation 82 + } 83 + 84 + case Repo.get_by(__MODULE__, agent_id: agent_id, seed_id: seed_id) do 85 + nil -> 86 + %__MODULE__{} 87 + |> changeset(changeset_attrs) 88 + |> Repo.insert() 89 + 90 + %__MODULE__{} = existing -> 91 + update_attrs = %{ 92 + profile_id: profile_id, 93 + generation_number: attrs.generation_number, 94 + is_current: attrs.is_current, 95 + created_at_generation: attrs.created_at_generation, 96 + updated_at: now 97 + } 98 + 99 + if generation_row_changed?(existing, update_attrs) do 100 + existing 101 + |> changeset(update_attrs) 102 + |> Repo.update() 103 + else 104 + {:ok, existing} 105 + end 106 + end 107 + end 108 + 109 + def update_agent_seed_generations( 110 + %SowerClient.Orchestration.AgentSeedsReport{} = report, 111 + %Agent{} = agent 112 + ) do 113 + Repo.transaction(fn -> 114 + if Enum.empty?(report.profiles) do 115 + delete_all_agent_seed_generations(agent.id) 116 + else 117 + for profile <- report.profiles do 118 + nix_profile = NixProfile.find_or_create!(profile.profile_path) 119 + rows = resolve_profile_generation_rows(agent, profile) 120 + sync_profile_generation_rows(agent, nix_profile, rows) 121 + end 122 + end 123 + 124 + :ok 125 + end) 126 + end 127 + 128 + defp resolve_profile_generation_rows(%Agent{} = agent, profile) do 129 + artifacts = 130 + profile.generations 131 + |> Enum.map(& &1.path) 132 + |> Enum.uniq() 133 + 134 + seeds_by_artifact = 135 + from(s in Seed, where: s.artifact in ^artifacts) 136 + |> Repo.all() 137 + |> Map.new(&{&1.artifact, &1}) 138 + 139 + {rows, _seeds_by_artifact} = 140 + Enum.reduce(profile.generations, {[], seeds_by_artifact}, fn gen, {rows, seeds} -> 141 + {seed, seeds} = 142 + case Map.get(seeds, gen.path) do 143 + nil -> 144 + case Seed.find_or_register(agent, gen, profile) do 145 + {:ok, seed} -> 146 + {seed, Map.put(seeds, gen.path, seed)} 147 + 148 + {:error, error} -> 149 + Logger.warning( 150 + msg: "Failed to auto-register seed from agent", 151 + artifact: gen.path, 152 + error: error 153 + ) 154 + 155 + {nil, seeds} 156 + end 157 + 158 + seed -> 159 + {seed, seeds} 160 + end 161 + 162 + case {seed, parse_generation_created(gen.created)} do 163 + {nil, _} -> 164 + {rows, seeds} 165 + 166 + {_, :error} -> 167 + Logger.warning( 168 + msg: "Failed to parse generation created timestamp", 169 + artifact: gen.path, 170 + created: gen.created 171 + ) 172 + 173 + {rows, seeds} 174 + 175 + {%Seed{id: seed_id}, {:ok, created_at}} -> 176 + row = %{ 177 + seed_id: seed_id, 178 + generation_number: gen.generation_number, 179 + is_current: gen.is_current, 180 + created_at_generation: created_at 181 + } 182 + 183 + {[row | rows], seeds} 184 + end 185 + end) 186 + 187 + rows 188 + |> Enum.reverse() 189 + |> normalize_current_generation_rows() 190 + |> Enum.reverse() 191 + |> Enum.uniq_by(& &1.seed_id) 192 + |> Enum.reverse() 193 + end 194 + 195 + defp parse_generation_created(%DateTime{} = dt), do: {:ok, DateTime.truncate(dt, :second)} 196 + 197 + defp parse_generation_created(str) when is_binary(str) do 198 + case DateTime.from_iso8601(str) do 199 + {:ok, dt, _offset} -> {:ok, DateTime.truncate(dt, :second)} 200 + _ -> :error 201 + end 202 + end 203 + 204 + defp parse_generation_created(_), do: :error 205 + 206 + defp normalize_current_generation_rows(rows) do 207 + current_seed_id = 208 + Enum.reduce(rows, nil, fn row, acc -> if row.is_current, do: row.seed_id, else: acc end) 209 + 210 + if is_nil(current_seed_id) do 211 + rows 212 + else 213 + Enum.map(rows, fn row -> 214 + %{row | is_current: row.seed_id == current_seed_id} 215 + end) 216 + end 217 + end 218 + 219 + defp sync_profile_generation_rows(%Agent{} = agent, nix_profile, rows) do 220 + if Enum.any?(rows, & &1.is_current) do 221 + from(asg in __MODULE__, 222 + where: asg.agent_id == ^agent.id and asg.profile_id == ^nix_profile.id 223 + ) 224 + |> Repo.update_all(set: [is_current: false]) 225 + end 226 + 227 + keep_seed_ids = 228 + Enum.reduce(rows, [], fn row, acc -> 229 + upsert_agent_generation(agent.id, nix_profile.id, row.seed_id, row) 230 + [row.seed_id | acc] 231 + end) 232 + |> Enum.uniq() 233 + 234 + delete_stale_agent_seed_generations(agent.id, nix_profile.id, keep_seed_ids) 235 + end 236 + 237 + defp generation_row_changed?(existing, attrs) do 238 + existing.profile_id != attrs.profile_id or 239 + existing.generation_number != attrs.generation_number or 240 + existing.is_current != attrs.is_current or 241 + existing.created_at_generation != attrs.created_at_generation 242 + end 243 + 244 + defp delete_stale_agent_seed_generations(agent_id, profile_id, keep_seed_ids) do 245 + query = 246 + from(asg in __MODULE__, 247 + where: asg.agent_id == ^agent_id and asg.profile_id == ^profile_id 248 + ) 249 + 250 + query = 251 + if keep_seed_ids == [] do 252 + query 253 + else 254 + from(asg in query, where: asg.seed_id not in ^keep_seed_ids) 255 + end 256 + 257 + Repo.delete_all(query) 258 + end 259 + 260 + defp delete_all_agent_seed_generations(agent_id) do 261 + from(asg in __MODULE__, where: asg.agent_id == ^agent_id) 262 + |> Repo.delete_all() 39 263 end 40 264 end
+610 -5
apps/sower/lib/sower/orchestration/deployment.ex
··· 1 1 defmodule Sower.Orchestration.Deployment do 2 2 use Sower.Schema 3 3 import Ecto.Changeset 4 + import Ecto.Query, warn: false 4 5 6 + alias Sower.Repo 7 + alias Sower.Accounts.User 5 8 alias Sower.Orchestration 9 + alias Sower.Orchestration.{Agent, Seed, Subscription, DeploymentPubSub} 10 + 11 + require Logger 12 + 13 + @default_stale_after_seconds 2 * 60 * 60 14 + @default_stale_batch_size 100 6 15 7 16 @derive {Jason.Encoder, only: [:sid]} 8 17 @derive {Phoenix.Param, key: :sid} ··· 11 20 field :sid, SowerClient.Sid, autogenerate: true 12 21 field :org_id, Ecto.UUID 13 22 14 - belongs_to :agent, Sower.Orchestration.Agent 23 + belongs_to :agent, Agent 15 24 belongs_to :parent_deployment, __MODULE__ 16 25 has_many :retries, __MODULE__, foreign_key: :parent_deployment_id 17 - belongs_to :retried_by_user, Sower.Accounts.User 26 + belongs_to :retried_by_user, User 18 27 19 - many_to_many :subscriptions, Sower.Orchestration.Subscription, 20 - join_through: Orchestration.SubscriptionDeployment 28 + many_to_many :subscriptions, Subscription, join_through: Orchestration.SubscriptionDeployment 21 29 22 - many_to_many :seeds, Sower.Seed, join_through: Orchestration.SeedDeployment 30 + many_to_many :seeds, Seed, join_through: Orchestration.SeedDeployment 23 31 24 32 field :deployed_at, :utc_datetime 25 33 field :result, Ecto.Enum, values: [:success, :failure, :partial] ··· 49 57 |> put_assoc(:subscriptions, Map.get(attrs, :subscriptions, deployment.subscriptions)) 50 58 |> validate_number(:retry_ordinal, greater_than: 0) 51 59 |> validate_required([]) 60 + end 61 + 62 + # CRUD 63 + 64 + def list_deployments do 65 + query = 66 + from(r in __MODULE__, 67 + order_by: [ 68 + desc: fragment("? IS NULL", r.deployed_at), 69 + desc: r.deployed_at, 70 + desc: r.inserted_at 71 + ] 72 + ) 73 + 74 + Repo.all(query) 75 + end 76 + 77 + def list_deployments(%Agent{} = agent, opts \\ []) do 78 + limit = Keyword.get(opts, :limit, 10) 79 + 80 + from(d in __MODULE__, 81 + where: d.agent_id == ^agent.id, 82 + order_by: [desc: d.inserted_at], 83 + limit: ^limit 84 + ) 85 + |> Repo.all() 86 + end 87 + 88 + def list_unresolved_deployments_for_agent(%Agent{} = agent, opts \\ []) do 89 + limit = Keyword.get(opts, :limit) 90 + 91 + query = 92 + from(d in __MODULE__, 93 + where: d.agent_id == ^agent.id and is_nil(d.result), 94 + order_by: [ 95 + asc: fragment("COALESCE(?, ?)", d.last_dispatched_at, d.inserted_at), 96 + asc: d.inserted_at 97 + ] 98 + ) 99 + 100 + query = 101 + if is_integer(limit) and limit > 0 do 102 + from(d in query, limit: ^limit) 103 + else 104 + query 105 + end 106 + 107 + query 108 + |> Repo.all() 109 + |> Repo.preload([:subscriptions, seeds: [:tags]]) 110 + end 111 + 112 + def get_deployment!(id), do: Repo.get!(__MODULE__, id) 113 + 114 + def get_deployment_sid!(sid), do: Repo.get_by!(__MODULE__, sid: sid) 115 + 116 + def get_deployment_sid(sid), do: Repo.get_by(__MODULE__, sid: sid) 117 + 118 + def create_deployment(attrs \\ %{}) do 119 + result = 120 + %__MODULE__{ 121 + org_id: Repo.get_org_id(), 122 + sid: SowerClient.Sid.generate("deploy") 123 + } 124 + |> changeset(attrs) 125 + |> Repo.insert() 126 + 127 + case result do 128 + {:ok, deployment} -> 129 + DeploymentPubSub.broadcast_deployment_change(deployment, :created) 130 + 131 + {:error, _} = error -> 132 + error 133 + end 134 + end 135 + 136 + def update_deployment(%__MODULE__{} = deployment, attrs) do 137 + result = 138 + deployment 139 + |> Repo.preload([:seeds, :subscriptions]) 140 + |> changeset(attrs) 141 + |> Repo.update() 142 + 143 + case result do 144 + {:ok, updated_deployment} -> 145 + DeploymentPubSub.broadcast_deployment_change(updated_deployment, :updated) 146 + 147 + {:error, _} = error -> 148 + error 149 + end 150 + end 151 + 152 + def delete_deployment(%__MODULE__{} = deployment) do 153 + Repo.delete(deployment) 154 + end 155 + 156 + def change_deployment(%__MODULE__{} = deployment, attrs \\ %{}) do 157 + changeset(deployment, attrs) 158 + end 159 + 160 + # Retry 161 + 162 + def retry_deployment(%__MODULE__{} = deployment, user_id) when is_integer(user_id) do 163 + Repo.transaction(fn -> 164 + user = Repo.get(User, user_id, skip_org_id: true) 165 + 166 + if is_nil(user) do 167 + Repo.rollback(:unauthorized) 168 + end 169 + 170 + deployment = 171 + from(d in __MODULE__, where: d.id == ^deployment.id, lock: "FOR UPDATE") 172 + |> Repo.one() 173 + |> Repo.preload([:seeds, :subscriptions]) 174 + 175 + cond do 176 + is_nil(deployment) -> 177 + Repo.rollback(:deployment_not_found) 178 + 179 + user.org_id != deployment.org_id -> 180 + Repo.rollback(:unauthorized) 181 + 182 + deployment.result not in [:success, :failure] -> 183 + Repo.rollback(:deployment_not_retryable) 184 + 185 + true -> 186 + retry_in_progress? = 187 + from(d in __MODULE__, 188 + where: d.parent_deployment_id == ^deployment.id and is_nil(d.result), 189 + limit: 1, 190 + select: d.id 191 + ) 192 + |> Repo.one() 193 + 194 + if retry_in_progress? do 195 + Repo.rollback(:retry_in_progress) 196 + else 197 + max_retry_ordinal = 198 + from(d in __MODULE__, 199 + where: d.parent_deployment_id == ^deployment.id, 200 + select: max(d.retry_ordinal) 201 + ) 202 + |> Repo.one() || 0 203 + 204 + attrs = %{ 205 + agent_id: deployment.agent_id, 206 + content_hash: deployment.content_hash, 207 + seeds: deployment.seeds, 208 + subscriptions: deployment.subscriptions, 209 + parent_deployment_id: deployment.id, 210 + retried_by_user_id: user_id, 211 + retry_ordinal: max_retry_ordinal + 1, 212 + retried_at: DateTime.utc_now(), 213 + last_dispatched_at: DateTime.utc_now() 214 + } 215 + 216 + case create_deployment(attrs) do 217 + {:ok, retry_deployment} -> 218 + retry_deployment = 219 + Repo.preload(retry_deployment, [:agent, :subscriptions, seeds: [:tags]]) 220 + 221 + request_id = SowerClient.Sid.generate("request") 222 + 223 + SowerWeb.Endpoint.broadcast( 224 + "agent:#{retry_deployment.agent.sid}", 225 + "deployment", 226 + deployment_event_payload(retry_deployment, request_id) 227 + ) 228 + 229 + retry_deployment 230 + 231 + {:error, changeset} -> 232 + Repo.rollback(changeset) 233 + end 234 + end 235 + end 236 + end) 237 + end 238 + 239 + # Replay 240 + 241 + def replay_unresolved_deployments(%Agent{} = agent, opts \\ []) do 242 + broadcast_fun = Keyword.get(opts, :broadcast_fun, &SowerWeb.Endpoint.broadcast/3) 243 + 244 + request_id_fun = 245 + Keyword.get(opts, :request_id_fun, fn -> SowerClient.Sid.generate("request") end) 246 + 247 + now = Keyword.get(opts, :now, DateTime.utc_now()) 248 + 249 + deployments = list_unresolved_deployments_for_agent(agent) 250 + mark_deployments_dispatched(deployments, now) 251 + 252 + Enum.each(deployments, fn deployment -> 253 + payload = deployment_event_payload(deployment, request_id_fun.()) 254 + broadcast_fun.("agent:#{agent.sid}", "deployment", payload) 255 + end) 256 + 257 + if deployments != [] do 258 + Logger.info( 259 + msg: "Replayed unresolved deployments", 260 + agent_sid: agent.sid, 261 + deployment_count: length(deployments), 262 + deployment_sids: Enum.map(deployments, & &1.sid) 263 + ) 264 + end 265 + 266 + {:ok, deployments} 267 + end 268 + 269 + # Seed matching 270 + 271 + def match_seed(%Subscription{} = subscription) do 272 + tags = 273 + Enum.map(subscription.rules || [], fn rule -> 274 + %{key: rule.key, value: rule.value} 275 + end) 276 + 277 + Seed.latest(subscription.seed_name, subscription.seed_type, tags) 278 + end 279 + 280 + def list_matching_seeds(%Subscription{} = subscription, limit \\ 10) do 281 + tags = 282 + Enum.map(subscription.rules || [], fn rule -> 283 + %{key: rule.key, value: rule.value} 284 + end) 285 + 286 + Seed.list_matching(subscription.seed_name, subscription.seed_type, tags, limit: limit) 287 + end 288 + 289 + # Deployment request handling 290 + 291 + def deploy_subscription(%Subscription{} = sub, opts \\ []) do 292 + subscription = Repo.preload(sub, :agent) 293 + 294 + case subscription.agent do 295 + nil -> 296 + {:error, :agent_not_found} 297 + 298 + %Agent{} = agent -> 299 + request_id = SowerClient.Sid.generate("request") 300 + {:ok, request_id} = process_deployment(request_id, [subscription], agent, opts) 301 + {:ok, request_id} 302 + end 303 + end 304 + 305 + def request_deployment(%SowerClient.Orchestration.DeploymentRequest{} = request) do 306 + with {:ok, subs} <- validate_request_subscriptions(request.subscription_sids) do 307 + do_deployment(request.request_id, subs, force: request.force) 308 + else 309 + {:error, _} = err -> 310 + Logger.error(msg: "Failed to process deployment request", error: IO.inspect(err)) 311 + {:error, :unknown_error} 312 + end 313 + end 314 + 315 + def handle_deployment_request(payload, agent) do 316 + with {:ok, request} <- SowerClient.Orchestration.DeploymentRequest.cast(payload), 317 + {:ok, subscriptions} <- validate_deployment_request(request, agent.id), 318 + {:ok, request_id} <- 319 + process_deployment(request.request_id, subscriptions, agent, force: request.force) do 320 + {:ok, request_id} 321 + end 322 + end 323 + 324 + def process_deployment(request_id, subscriptions, %Agent{} = agent, opts \\ []) do 325 + Task.Supervisor.start_child(Sower.TaskSupervisor, fn -> 326 + Repo.put_org_id(agent.org_id) 327 + 328 + Logger.info( 329 + msg: "Deployment processing started", 330 + request_id: request_id, 331 + agent_id: agent.id 332 + ) 333 + 334 + case do_deployment(request_id, subscriptions, opts) do 335 + {:ok, deployment} -> 336 + Logger.info( 337 + msg: "Deployment broadcast successful", 338 + request_id: request_id, 339 + deployment_sid: deployment.sid, 340 + skipped: deployment.skipped 341 + ) 342 + 343 + SowerWeb.Endpoint.broadcast( 344 + "agent:#{agent.sid}", 345 + "deployment", 346 + Map.from_struct(deployment) 347 + ) 348 + 349 + {:error, reason} -> 350 + Logger.error( 351 + msg: "Deployment processing failed", 352 + request_id: request_id, 353 + reason: to_string(reason) 354 + ) 355 + 356 + SowerWeb.Endpoint.broadcast( 357 + "agent:#{agent.sid}", 358 + "deployment:error", 359 + %{request_id: request_id, reason: to_string(reason)} 360 + ) 361 + end 362 + end) 363 + 364 + {:ok, request_id} 365 + end 366 + 367 + def record_deployment(%SowerClient.Orchestration.DeploymentResult{} = result) do 368 + case get_deployment_sid(result.deployment_sid) do 369 + nil -> 370 + {:error, :deployment_not_found} 371 + 372 + deploy -> 373 + update_deployment(deploy, %{deployed_at: result.deployed_at, result: result.result}) 374 + end 375 + end 376 + 377 + # Stale deployment finalization 378 + 379 + def finalize_stale_deployments(opts \\ []) do 380 + now = Keyword.get(opts, :now, DateTime.utc_now()) 381 + stale_after_seconds = Keyword.get(opts, :stale_after_seconds, stale_after_seconds()) 382 + batch_size = Keyword.get(opts, :batch_size, stale_batch_size()) 383 + 384 + if stale_after_seconds <= 0 or batch_size <= 0 do 385 + {:ok, 0} 386 + else 387 + cutoff = DateTime.add(now, -stale_after_seconds, :second) 388 + 389 + stale_deployments = 390 + from(d in __MODULE__, 391 + where: is_nil(d.result), 392 + where: fragment("COALESCE(?, ?) <= ?", d.last_dispatched_at, d.inserted_at, ^cutoff), 393 + order_by: [ 394 + asc: fragment("COALESCE(?, ?)", d.last_dispatched_at, d.inserted_at), 395 + asc: d.inserted_at 396 + ], 397 + limit: ^batch_size 398 + ) 399 + |> Repo.all(skip_org_id: true) 400 + 401 + finalized = 402 + Enum.reduce(stale_deployments, 0, fn deployment, acc -> 403 + case finalize_stale_deployment(deployment, now) do 404 + {:ok, _} -> acc + 1 405 + _ -> acc 406 + end 407 + end) 408 + 409 + if finalized > 0 do 410 + Logger.info( 411 + msg: "Finalized stale deployments", 412 + stale_after_seconds: stale_after_seconds, 413 + batch_size: batch_size, 414 + finalized_count: finalized 415 + ) 416 + end 417 + 418 + {:ok, finalized} 419 + end 420 + end 421 + 422 + # Private helpers 423 + 424 + defp validate_request_subscriptions(sids) when is_list(sids) and length(sids) > 0 do 425 + subs = Subscription.get_subscription_sids(sids) 426 + subs = Repo.preload(subs, :agent) 427 + 428 + if subs == [] do 429 + {:error, :subscription_not_found} 430 + else 431 + {:ok, subs} 432 + end 433 + end 434 + 435 + defp validate_request_subscriptions(_), do: {:error, :subscription_not_found} 436 + 437 + defp validate_deployment_request( 438 + %SowerClient.Orchestration.DeploymentRequest{} = request, 439 + agent_id 440 + ) do 441 + subs = Subscription.get_subscription_sids(request.subscription_sids) 442 + 443 + cond do 444 + subs == [] -> 445 + {:error, :subscription_not_found} 446 + 447 + not Enum.all?(subs, &(&1.agent_id == agent_id)) -> 448 + {:error, :unauthorized} 449 + 450 + true -> 451 + {:ok, Repo.preload(subs, :agent)} 452 + end 453 + end 454 + 455 + defp do_deployment(request_id, subscriptions, opts) do 456 + force? = Keyword.get(opts, :force, false) 457 + agent_id = hd(subscriptions).agent_id 458 + 459 + seed_deploys = 460 + subscriptions 461 + |> Enum.reduce([], fn sub, acc -> 462 + case match_seed(sub) do 463 + nil -> 464 + acc 465 + 466 + seed -> 467 + [ 468 + %SowerClient.Orchestration.SeedDeployment{ 469 + seed: seed, 470 + subscription_sid: sub.sid 471 + } 472 + | acc 473 + ] 474 + end 475 + end) 476 + 477 + seeds = Enum.map(seed_deploys, & &1.seed) 478 + 479 + if seeds == [] do 480 + Logger.warning( 481 + msg: "No matching seeds found for deployment request", 482 + request_id: request_id, 483 + subscription_count: length(subscriptions) 484 + ) 485 + 486 + {:error, :seeds_not_found} 487 + else 488 + content_hash = compute_content_hash(seeds) 489 + 490 + Logger.debug( 491 + msg: "Processing deployment with matched seeds", 492 + request_id: request_id, 493 + seed_count: length(seeds), 494 + content_hash: content_hash 495 + ) 496 + 497 + case find_duplicate_deployment(agent_id, content_hash, force?) do 498 + {:skip, existing} -> 499 + existing = Repo.preload(existing, [:seeds]) 500 + 501 + Logger.info( 502 + msg: "Skipping deployment - duplicate found", 503 + request_id: request_id, 504 + deployment_sid: existing.sid, 505 + content_hash: content_hash 506 + ) 507 + 508 + {:ok, 509 + %SowerClient.Orchestration.Deployment{ 510 + request_id: request_id, 511 + sid: existing.sid, 512 + seed_deployments: seed_deploys, 513 + skipped: true 514 + }} 515 + 516 + :proceed -> 517 + Logger.debug( 518 + msg: "Creating new deployment record", 519 + request_id: request_id, 520 + agent_id: agent_id 521 + ) 522 + 523 + case create_deployment(%{ 524 + agent_id: agent_id, 525 + content_hash: content_hash, 526 + last_dispatched_at: DateTime.utc_now(), 527 + seeds: seeds, 528 + subscriptions: subscriptions 529 + }) do 530 + {:ok, deploy} -> 531 + Logger.info( 532 + msg: "Deployment record created successfully", 533 + request_id: request_id, 534 + deployment_sid: deploy.sid 535 + ) 536 + 537 + {:ok, 538 + %SowerClient.Orchestration.Deployment{ 539 + request_id: request_id, 540 + sid: deploy.sid, 541 + seed_deployments: seed_deploys, 542 + skipped: false 543 + }} 544 + 545 + {:error, reason} -> 546 + Logger.error( 547 + msg: "Failed to create deployment record", 548 + request_id: request_id, 549 + reason: inspect(reason) 550 + ) 551 + 552 + {:error, reason} 553 + end 554 + end 555 + end 556 + end 557 + 558 + defp compute_content_hash(seeds) do 559 + seeds 560 + |> Enum.map(& &1.id) 561 + |> Enum.sort() 562 + |> Enum.join(":") 563 + |> then(&:crypto.hash(:sha256, &1)) 564 + |> Base.encode16(case: :lower) 565 + end 566 + 567 + defp find_duplicate_deployment(_agent_id, _content_hash, true), do: :proceed 568 + 569 + defp find_duplicate_deployment(agent_id, content_hash, false) do 570 + query = 571 + from(d in __MODULE__, 572 + where: 573 + d.agent_id == ^agent_id and 574 + d.content_hash == ^content_hash and 575 + (d.result == :success or is_nil(d.result)), 576 + order_by: [desc: d.inserted_at], 577 + limit: 1 578 + ) 579 + 580 + case Repo.one(query) do 581 + nil -> :proceed 582 + deployment -> {:skip, deployment} 583 + end 584 + end 585 + 586 + defp deployment_event_payload(%__MODULE__{} = deployment, request_id) do 587 + %SowerClient.Orchestration.Deployment{ 588 + request_id: request_id, 589 + sid: deployment.sid, 590 + seed_deployments: build_seed_deployments(deployment.seeds, deployment.subscriptions), 591 + skipped: false 592 + } 593 + end 594 + 595 + defp build_seed_deployments(seeds, subscriptions) do 596 + Enum.map(seeds, fn seed -> 597 + subscription_sid = 598 + subscriptions 599 + |> Enum.find(fn sub -> 600 + sub.seed_name == seed.name and sub.seed_type == seed.seed_type 601 + end) 602 + |> case do 603 + nil -> nil 604 + sub -> sub.sid 605 + end 606 + 607 + %SowerClient.Orchestration.SeedDeployment{ 608 + seed: seed, 609 + subscription_sid: subscription_sid 610 + } 611 + end) 612 + end 613 + 614 + defp mark_deployments_dispatched([], _dispatched_at), do: :ok 615 + 616 + defp mark_deployments_dispatched(deployments, dispatched_at) do 617 + ids = Enum.map(deployments, & &1.id) 618 + now = DateTime.utc_now() 619 + 620 + from(d in __MODULE__, 621 + where: d.id in ^ids and is_nil(d.result) 622 + ) 623 + |> Repo.update_all(set: [last_dispatched_at: dispatched_at, updated_at: now]) 624 + 625 + :ok 626 + end 627 + 628 + defp finalize_stale_deployment(%__MODULE__{} = deployment, now) do 629 + previous_org_id = Repo.get_org_id() 630 + Repo.put_org_id(deployment.org_id) 631 + 632 + result = 633 + case Repo.get(__MODULE__, deployment.id) do 634 + nil -> 635 + :ignore 636 + 637 + %__MODULE__{result: nil} = unresolved -> 638 + update_deployment(unresolved, %{deployed_at: now, result: :failure}) 639 + 640 + %__MODULE__{} -> 641 + :ignore 642 + end 643 + 644 + Repo.put_org_id(previous_org_id) 645 + 646 + result 647 + end 648 + 649 + defp stale_after_seconds do 650 + config = Application.get_env(:sower, Sower.Orchestration, []) 651 + Keyword.get(config, :stale_after_seconds, @default_stale_after_seconds) 652 + end 653 + 654 + defp stale_batch_size do 655 + config = Application.get_env(:sower, Sower.Orchestration, []) 656 + Keyword.get(config, :stale_batch_size, @default_stale_batch_size) 52 657 end 53 658 end
+171 -2
apps/sower/lib/sower/orchestration/subscription.ex
··· 1 1 defmodule Sower.Orchestration.Subscription do 2 2 use Sower.Schema 3 3 import Ecto.Changeset 4 + import Ecto.Query, warn: false 5 + 6 + alias Sower.Repo 7 + alias Sower.Orchestration.{Agent, Deployment, SubscriptionDeployment} 4 8 5 9 @derive {Jason.Encoder, only: [:sid]} 6 10 @derive {Phoenix.Param, key: :sid} 7 - 8 - alias Sower.Orchestration.{Agent, Deployment, SubscriptionDeployment} 9 11 10 12 schema "subscriptions" do 11 13 field :sid, SowerClient.Sid, autogenerate: true ··· 28 30 |> cast(attrs, [:agent_id, :seed_name, :seed_type]) 29 31 |> cast_embed(:rules, with: &__MODULE__.Rule.changeset/2) 30 32 |> unique_constraint([:agent_id, :org_id, :seed_name, :seed_type]) 33 + end 34 + 35 + def list_subscriptions do 36 + Repo.all(__MODULE__) 37 + |> Repo.preload([:agent]) 38 + end 39 + 40 + def list_subscriptions_for_agent(%Agent{} = agent) do 41 + __MODULE__ 42 + |> where([s], s.agent_id == ^agent.id) 43 + |> Repo.all() 44 + end 45 + 46 + def get_subscription!(id) do 47 + Repo.get!(__MODULE__, id) 48 + |> Repo.preload(:agent) 49 + end 50 + 51 + def get_subscription_sid!(sid), do: Repo.get_by!(__MODULE__, sid: sid) 52 + 53 + def get_subscription_sid(sid) do 54 + __MODULE__ 55 + |> Repo.get_by(sid: sid) 56 + end 57 + 58 + def get_subscription_sid_with_deployments!(sid) do 59 + subscription = get_subscription_sid!(sid) 60 + 61 + Repo.preload(subscription, [ 62 + :agent, 63 + deployments: 64 + from(d in Deployment, 65 + order_by: [ 66 + desc: fragment("? IS NULL", d.deployed_at), 67 + desc: d.deployed_at, 68 + desc: d.inserted_at 69 + ] 70 + ) 71 + ]) 72 + end 73 + 74 + def get_subscription_sid_with_deployments(sid) do 75 + get_subscription_sid(sid) 76 + |> Repo.preload([ 77 + :agent, 78 + deployments: 79 + from(d in Deployment, 80 + order_by: [ 81 + desc: fragment("? IS NULL", d.deployed_at), 82 + desc: d.deployed_at, 83 + desc: d.inserted_at 84 + ] 85 + ) 86 + ]) 87 + end 88 + 89 + def get_subscription_sids(sids) when is_list(sids) and length(sids) > 0 do 90 + query = from(sub in __MODULE__, where: sub.sid in ^sids) 91 + 92 + Repo.all(query) 93 + end 94 + 95 + def get_subscription_sids(sids) when is_list(sids) and length(sids) == 0 do 96 + {:error, :no_sids_provided} 97 + end 98 + 99 + def find_subscription(%Sower.Orchestration.Seed{} = seed) do 100 + rules_filter = 101 + Enum.map(seed.tags || [], fn tag -> 102 + %{key: tag.key, value: tag.value} 103 + end) 104 + 105 + from(s in __MODULE__, 106 + where: s.seed_name == ^seed.name, 107 + where: s.seed_type == ^seed.seed_type, 108 + where: 109 + fragment( 110 + """ 111 + NOT EXISTS ( 112 + SELECT 1 FROM jsonb_array_elements(?) AS r 113 + WHERE NOT EXISTS ( 114 + SELECT 1 FROM jsonb_array_elements(?) AS t 115 + WHERE t->>'key' = r->>'key' AND t->>'value' = r->>'value' 116 + ) 117 + ) 118 + """, 119 + s.rules, 120 + ^rules_filter 121 + ) 122 + ) 123 + |> Repo.all() 124 + end 125 + 126 + def create_subscription(attrs \\ %{}) do 127 + case %__MODULE__{ 128 + org_id: Repo.get_org_id(), 129 + sid: SowerClient.Sid.generate("sub") 130 + } 131 + |> changeset(attrs) 132 + |> Repo.insert( 133 + on_conflict: {:replace, [:updated_at, :rules]}, 134 + conflict_target: [:agent_id, :org_id, :seed_name, :seed_type], 135 + returning: true 136 + ) do 137 + {:ok, sub} -> {:ok, Repo.reload(sub)} 138 + err -> err 139 + end 140 + end 141 + 142 + def register_subscription( 143 + %SowerClient.Orchestration.Subscription{ 144 + seed_name: seed_name, 145 + seed_type: seed_type, 146 + rules: rules 147 + }, 148 + agent_id 149 + ) do 150 + case create_subscription(%{ 151 + agent_id: agent_id, 152 + seed_name: seed_name, 153 + seed_type: seed_type, 154 + rules: rules 155 + }) do 156 + {:ok, subscription} -> 157 + {:ok, SowerClient.Orchestration.Subscription.cast!(subscription)} 158 + 159 + {:error, _} = error -> 160 + error 161 + end 162 + end 163 + 164 + def sync_subscriptions(subscriptions, agent_id) do 165 + Repo.transaction(fn -> 166 + registered = 167 + Enum.map(subscriptions, fn sub -> 168 + case register_subscription(sub, agent_id) do 169 + {:ok, s} -> s 170 + {:error, reason} -> Repo.rollback(reason) 171 + end 172 + end) 173 + 174 + registered_sids = Enum.map(registered, & &1.sid) 175 + 176 + from(s in __MODULE__, 177 + where: s.agent_id == ^agent_id, 178 + where: s.sid not in ^registered_sids 179 + ) 180 + |> Repo.delete_all() 181 + 182 + registered 183 + end) 184 + end 185 + 186 + def update_subscription(%__MODULE__{} = subscription, attrs) do 187 + subscription 188 + |> changeset(attrs) 189 + |> Repo.update() 190 + end 191 + 192 + def delete_subscription(%__MODULE__{} = subscription) do 193 + Repo.delete(subscription) 194 + end 195 + 196 + def change_subscription(%__MODULE__{} = subscription, attrs \\ %{}) do 197 + subscription 198 + |> Repo.preload(:agent) 199 + |> changeset(attrs) 31 200 end 32 201 33 202 defmodule Rule do
+2 -2
apps/sower/lib/sower/repo/seeds/org.ex
··· 66 66 |> Enum.map(fn t -> 67 67 name = ~s"test#{t}" 68 68 69 - case Sower.Seed.get(name, "nixos") do 69 + case Sower.Orchestration.Seed.get(name, "nixos") do 70 70 nil -> 71 - Sower.Seed.create(%{ 71 + Sower.Orchestration.Seed.create(%{ 72 72 name: name, 73 73 seed_type: "nixos", 74 74 org_id: user.org_id,
+8 -7
apps/sower/lib/sower/seed.ex apps/sower/lib/sower/orchestration/seed.ex
··· 1 - defmodule Sower.Seed do 1 + defmodule Sower.Orchestration.Seed do 2 2 use Sower.Schema 3 3 4 4 import Ecto.Changeset 5 5 import Ecto.Query, only: [from: 2] 6 6 7 - alias Sower.{Repo, Seed, SeedTag} 7 + alias Sower.Repo 8 + alias Sower.Orchestration.{Seed, SeedTag} 8 9 alias Ecto.Multi 9 10 10 11 @derive {Jason.Encoder, only: [:sid, :name, :seed_type, :artifact, :tags]} ··· 300 301 301 302 ## Examples 302 303 303 - iex> Sower.Seed.extract_info_from_store_path("/nix/store/abc123-nixos-system-myhost-25.11") 304 + iex> Sower.Orchestration.Seed.extract_info_from_store_path("/nix/store/abc123-nixos-system-myhost-25.11") 304 305 {"myhost", [%{key: "nixos_version", value: "25.05"}]} 305 306 306 - iex> Sower.Seed.extract_info_from_store_path("/nix/store/xyz789-home-manager-generation") 307 + iex> Sower.Orchestration.Seed.extract_info_from_store_path("/nix/store/xyz789-home-manager-generation") 307 308 {"home-manager-generation", []} 308 309 """ 309 310 def extract_info_from_store_path(path) do ··· 329 330 330 331 ## Examples 331 332 332 - iex> Sower.Seed.seed_type_from_profile_path("/nix/var/nix/profiles/system") 333 + iex> Sower.Orchestration.Seed.seed_type_from_profile_path("/nix/var/nix/profiles/system") 333 334 "nixos" 334 335 335 - iex> Sower.Seed.seed_type_from_profile_path("/home/user/.local/state/nix/profiles/home-manager") 336 + iex> Sower.Orchestration.Seed.seed_type_from_profile_path("/home/user/.local/state/nix/profiles/home-manager") 336 337 "home-manager" 337 338 338 - iex> Sower.Seed.seed_type_from_profile_path("/run/current-system/sw") 339 + iex> Sower.Orchestration.Seed.seed_type_from_profile_path("/run/current-system/sw") 339 340 "nixos" 340 341 """ 341 342 def seed_type_from_profile_path(profile_path) do
+2 -2
apps/sower/lib/sower/seed_tag.ex apps/sower/lib/sower/orchestration/seed_tag.ex
··· 1 - defmodule Sower.SeedTag do 1 + defmodule Sower.Orchestration.SeedTag do 2 2 use Sower.Schema 3 3 import Ecto.Changeset 4 4 ··· 8 8 field :key, :string 9 9 field :value, :string 10 10 11 - belongs_to :seed, Sower.Seed 11 + belongs_to :seed, Sower.Orchestration.Seed 12 12 end 13 13 14 14 def changeset(tag, attrs) do
+1 -1
apps/sower/lib/sower_web/agent_channel.ex
··· 100 100 end 101 101 end 102 102 103 - handle_schema(SowerClient.Seed, &Sower.Seed.get_by_request/1) 103 + handle_schema(SowerClient.Seed, &Sower.Orchestration.Seed.get_by_request/1) 104 104 105 105 handle_schema(SowerClient.Orchestration.Subscription, fn req, socket -> 106 106 Sower.Orchestration.register_subscription(req, socket.assigns.agent.id)
+12 -12
apps/sower/lib/sower_web/controllers/api/seed_controller.ex
··· 49 49 conn = Map.put(conn, :body_params, %{}) 50 50 51 51 if can(conn.assigns.access_token) 52 - |> create?(%Sower.Seed{org_id: conn.assigns.access_token.org_id}) do 52 + |> create?(%Sower.Orchestration.Seed{org_id: conn.assigns.access_token.org_id}) do 53 53 seed_attrs = %{name: name, seed_type: seed_type, artifact: artifact} 54 54 55 55 seed_attrs = ··· 61 61 Map.put(seed_attrs, :tags, Enum.map(tags, &Map.from_struct/1)) 62 62 end 63 63 64 - case Sower.Seed.create(seed_attrs, rename: rename) do 65 - {:ok, %Sower.Seed{} = seed} -> 64 + case Sower.Orchestration.Seed.create(seed_attrs, rename: rename) do 65 + {:ok, %Sower.Orchestration.Seed{} = seed} -> 66 66 conn 67 67 |> put_status(:created) 68 68 |> render(:show, seed: seed) ··· 110 110 111 111 def latest(conn, %{name: name, seed_type: seed_type} = params) do 112 112 if can(conn.assigns.access_token) 113 - |> read?(%Sower.Seed{org_id: conn.assigns.access_token.org_id}) do 113 + |> read?(%Sower.Orchestration.Seed{org_id: conn.assigns.access_token.org_id}) do 114 114 tags = parse_tags(params[:tags]) 115 115 116 - case Sower.Seed.latest(name, seed_type, tags) do 116 + case Sower.Orchestration.Seed.latest(name, seed_type, tags) do 117 117 nil -> 118 118 conn |> put_status(404) |> render(:not_found) 119 119 ··· 159 159 def get(conn, %{sid: sid}) do 160 160 if conn.assigns.access_token 161 161 |> can() 162 - |> read?(%Sower.Seed{org_id: conn.assigns.access_token.org_id}) do 163 - case Sower.Seed.get_sid(sid) do 162 + |> read?(%Sower.Orchestration.Seed{org_id: conn.assigns.access_token.org_id}) do 163 + case Sower.Orchestration.Seed.get_sid(sid) do 164 164 nil -> 165 165 conn |> put_status(404) |> render(:error, error: "not found") 166 166 ··· 174 174 175 175 def get(conn, _) do 176 176 if can(conn.assigns.access_token) 177 - |> read?(%Sower.Seed{org_id: conn.assigns.access_token.org_id}) do 177 + |> read?(%Sower.Orchestration.Seed{org_id: conn.assigns.access_token.org_id}) do 178 178 conn |> put_status(:not_found) |> render(:not_found) 179 179 else 180 180 conn |> put_status(401) |> render(:error, error: "unauthorized") ··· 209 209 210 210 def list(conn, %{name: name, seed_type: seed_type}) do 211 211 if can(conn.assigns.access_token) 212 - |> read?(%Sower.Seed{org_id: conn.assigns.access_token.org_id}) do 213 - seed = Sower.Seed.get(name, seed_type) 212 + |> read?(%Sower.Orchestration.Seed{org_id: conn.assigns.access_token.org_id}) do 213 + seed = Sower.Orchestration.Seed.get(name, seed_type) 214 214 215 215 case seed do 216 216 nil -> ··· 226 226 227 227 def list(conn, _) do 228 228 if can(conn.assigns.access_token) 229 - |> read?(%Sower.Seed{org_id: conn.assigns.access_token.org_id}) do 230 - seeds = Sower.Seed.list() 229 + |> read?(%Sower.Orchestration.Seed{org_id: conn.assigns.access_token.org_id}) do 230 + seeds = Sower.Orchestration.Seed.list() 231 231 render(conn, :list, seeds: seeds) 232 232 else 233 233 conn |> put_status(401) |> render(:error, error: "unauthorized")
+1 -1
apps/sower/lib/sower_web/live/seed_live/index.ex
··· 5 5 6 6 @impl true 7 7 def mount(_params, _session, socket) do 8 - {:ok, stream(socket, :seeds, Sower.Seed.list())} 8 + {:ok, stream(socket, :seeds, Sower.Orchestration.Seed.list())} 9 9 end 10 10 end
+1 -1
apps/sower/lib/sower_web/live/seed_live/show.ex
··· 10 10 11 11 @impl true 12 12 def handle_params(%{"sid" => sid}, _, socket) do 13 - case Sower.Seed.get_sid(sid) do 13 + case Sower.Orchestration.Seed.get_sid(sid) do 14 14 nil -> 15 15 {:noreply, 16 16 socket
+1 -1
apps/sower/test/sower/orchestration_test.exs
··· 591 591 592 592 describe "update_agent_seed_generations/2 with auto-registration" do 593 593 alias Sower.Orchestration.{AgentSeedGeneration, NixProfile} 594 - alias Sower.Seed 594 + alias Sower.Orchestration.Seed 595 595 596 596 import Sower.OrchestrationFixtures 597 597
+2 -2
apps/sower/test/sower/seed_test.exs
··· 5 5 import Sower.OrchestrationFixtures 6 6 import Sower.SeedFixtures 7 7 8 - alias Sower.Seed 8 + alias Sower.Orchestration.Seed 9 9 10 10 setup _ do 11 11 org = organization_fixture() ··· 41 41 42 42 {:ok, _} = Seed.create(Map.from_struct(seed)) 43 43 44 - assert Repo.all(Sower.Seed) |> Enum.count() == 1 44 + assert Repo.all(Sower.Orchestration.Seed) |> Enum.count() == 1 45 45 end 46 46 end 47 47
+2 -2
apps/sower/test/support/fixtures/seed_fixtures.ex
··· 1 1 defmodule Sower.SeedFixtures do 2 2 @moduledoc """ 3 3 This module defines test helpers for creating 4 - entities via the `Sower.Seed` context. 4 + entities via the `Sower.Orchestration.Seed` context. 5 5 """ 6 6 7 7 def unique_seed_name, do: "seed#{System.unique_integer()}" ··· 22 22 {:ok, seed} = 23 23 attrs 24 24 |> valid_seed_attributes() 25 - |> Sower.Seed.create() 25 + |> Sower.Orchestration.Seed.create() 26 26 27 27 seed 28 28 end