this repo has no description
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 293 lines 8.1 kB view raw
1defmodule Hobbes.Workloads do 2 require Logger 3 alias Trinity.{Sim, SimServer, SimLogger} 4 5 alias Hobbes.Structs.Cluster 6 7 import Hobbes.Utils 8 9 defmodule Workload do 10 @callback run(context :: map, opts :: keyword) :: {:ok, term} 11 end 12 13 defmodule Runner do 14 use GenServer 15 alias Trinity.SimServer 16 17 defmodule State do 18 @enforce_keys [:workload_module, :opts, :context, :results] 19 defstruct @enforce_keys 20 end 21 22 @spec start_link(module, keyword, map) :: GenServer.on_start 23 def start_link(workload_module, opts, context) do 24 SimServer.start_link(__MODULE__, {workload_module, opts, context}) 25 end 26 27 def run(server), do: SimServer.cast(server, :run) 28 29 def get_results(server), do: SimServer.call(server, :get_results, :infinity) 30 31 def init({workload_module, opts, context}) 32 when is_atom(workload_module) and is_list(opts) and is_map(context) do 33 {:ok, %State{ 34 workload_module: workload_module, 35 opts: opts, 36 context: context, 37 results: nil, 38 }} 39 end 40 41 def handle_call(:get_results, _from, %State{} = state) do 42 {:reply, state.results, state} 43 end 44 45 def handle_cast(:run, %State{} = state) do 46 results = state.workload_module.run(state.context, state.opts) 47 {:noreply, %State{state | results: results}} 48 end 49 end 50 51 @type workload_opts :: [ 52 simulated: boolean, 53 ] 54 55 @spec run([{module, keyword}], keyword) :: :ok 56 def run(workloads, opts \\ []) when is_list(workloads) do 57 {count, opts} = Keyword.pop(opts, :count, 1) 58 {seed, opts} = Keyword.pop(opts, :seed, nil) 59 60 seeds = 61 if seed do 62 [seed] 63 else 64 100..(100 + count - 1) 65 end 66 67 {:ok, supervisor} = Task.Supervisor.start_link() 68 69 results = 70 Enum.map(seeds, fn seed -> 71 Task.Supervisor.async_nolink(supervisor, fn -> 72 if opts[:simulated] do 73 Sim.run_simulation(fn -> 74 run_one(seed, workloads, opts) 75 end, seed: seed) 76 else 77 run_one(seed, workloads, opts) 78 end 79 end) 80 end) 81 |> Task.yield_many(:infinity) 82 |> Enum.map(fn {_task, result} -> result end) 83 |> Enum.zip(seeds) 84 85 {succeeded, failed} = Enum.split_with(results, fn 86 {{:ok, _value}, _seed} -> true 87 {{:exit, _error}, _seed} -> false 88 end) 89 90 succeeded_message = 91 succeeded 92 |> Enum.map(fn {{:ok, {:ok, result}}, seed} -> 93 """ 94 Seed #{seed} succeeded with results: 95 #{indent(result.message)}\ 96 """ 97 end) 98 |> Enum.join("\n") 99 100 failed_message = 101 failed 102 |> Enum.map(fn 103 {{:exit, {:timeout, _mfa} = reason}, seed} -> 104 """ 105 - Seed #{inspect(seed)} timed out 106 #{Exception.format_exit(reason) |> color(:red) |> indent()}\ 107 """ 108 109 {{:exit, {exception, stacktrace}}, seed} -> 110 """ 111 - Seed #{inspect(seed)} failed with error: 112 #{Exception.format_banner(:error, exception, stacktrace) |> color(:red) |> indent()} 113 #{"stacktrace:" |> color(:cyan) |> indent()} 114 #{Exception.format_stacktrace(stacktrace)} 115 """ 116 117 {{:exit, reason}, seed} -> 118 """ 119 - Seed #{inspect(seed)} failed with reason: 120 #{inspect(reason) |> color(:red)} 121 """ 122 end) 123 |> Enum.join("\n") 124 125 num_failed = length(failed) 126 127 header = """ 128 129 Results for test #{inspect(opts[:name] || "Untitled")} 130 131 #{length(results)} test workloads, #{num_failed} failures\ 132 """ 133 134 Logger.info """ 135 #{color(header, :cyan)} 136 137 #{(if num_failed > 0, do: failed_message, else: succeeded_message) |> indent()}\ 138 """ 139 140 if num_failed > 0, do: raise "At least one workload failed!" 141 142 :ok 143 end 144 145 def run_one(_seed, workloads, opts \\ []) do 146 {cluster_opts, _opts} = Keyword.pop(opts, :cluster_opts, []) 147 cluster_name = "cluster" 148 149 {:ok, _pid} = Hobbes.Cache.start_link() 150 151 # Use a distributed cluster in simulation 152 cluster_opts = Keyword.put(cluster_opts, :distributed, Sim.simulated?()) 153 cluster_opts = Keyword.put(cluster_opts, :cluster_name, cluster_name) 154 {:ok, _coordinator_pids} = Hobbes.Sandbox.start_cluster(cluster_opts) 155 156 # Keep retrying get_cluster until coordinators are connected 157 {:ok, %Cluster{} = cluster} = 158 retry_acc(nil, fn nil -> 159 case Hobbes.Cache.lookup_cluster(cluster_name) do 160 {:ok, _cluster} = result -> {:halt, result} 161 {:error, _err} = error -> {:cont, nil, error} 162 end 163 end, 20) 164 165 context = %{cluster_name: cluster_name, cluster: cluster} 166 167 runners = Enum.map(workloads, fn {module, opts} when is_atom(module) and is_list(opts) -> 168 {:ok, pid} = Runner.start_link(module, opts, context) 169 pid 170 end) 171 172 Enum.each(runners, &Runner.run/1) 173 results = Enum.map(runners, &Runner.get_results/1) 174 175 result_logs = 176 Enum.zip(workloads, results) 177 |> Enum.map(fn {{module, opts}, {:ok, message}} when is_binary(message) -> 178 """ 179 - #{inspect(module)} with opts #{inspect(opts)} 180 #{message |> color(:light_black) |> indent(4)}\ 181 """ 182 end) 183 |> Enum.join("\n\n") 184 185 {hash, log_size} = 186 case Sim.simulated?() do 187 true -> {SimLogger.get_hash, SimLogger.get_log_size()} 188 false -> {0, 0} 189 end 190 #Logger.debug SimLogger.log_tail(10_000) |> Enum.join("\n") 191 192 readable_hash = 193 :crypto.hash(:sha256, <<hash::integer-32>>) 194 |> Base.encode32(padding: false) 195 |> String.slice(0, 8) 196 197 message = """ 198 Logged #{log_size} events with hash #{hash} 199 SHA256: #{readable_hash} 200 201 #{result_logs} 202 """ 203 {:ok, %{message: message}} 204 end 205 206 defp indent(string, spaces \\ 2) when is_binary(string) and is_integer(spaces) do 207 whitespace = String.duplicate(" ", spaces) 208 whitespace <> String.replace(string, "\n", "\n" <> whitespace) 209 end 210 211 defp color(string, color) when is_atom(color) do 212 IO.ANSI.format([color, string]) 213 |> IO.iodata_to_binary() 214 end 215 216 def table(rows, opts \\ []) do 217 cols = Keyword.get_lazy(opts, :cols, fn -> 218 Enum.map(1..length(hd(rows)), &Integer.to_string/1) 219 end) 220 221 align = Keyword.get_lazy(opts, :align, fn -> 222 Enum.map(1..length(hd(rows)), fn _i -> :left end) 223 end) 224 225 limit = case Keyword.get(opts, :limit, :infinity) do 226 :infinity -> :infinity 227 # +3 for header 228 limit when is_integer(limit) and limit > 0 -> limit + 3 229 end 230 231 rows = [cols | rows] 232 col_sizes = Enum.map(0..(length(hd(rows)) - 1), fn c_i -> 233 rows 234 |> Enum.map(fn r -> Enum.at(r, c_i) |> String.length() end) 235 |> Enum.max() 236 end) 237 col_meta = Enum.zip(col_sizes, align) 238 239 rows 240 |> Enum.map(fn r -> 241 r 242 |> Enum.zip(col_meta) 243 |> Enum.map(fn {v, {c_size, c_align}} -> 244 case c_align do 245 :left -> String.pad_trailing(v, c_size) 246 :right -> String.pad_leading(v, c_size) 247 end 248 end) 249 |> Enum.join(" | ") 250 end) 251 |> then(fn [header | rest] -> 252 sep = String.duplicate("-", String.length(header)) 253 [sep | [header | [sep | rest]]] 254 end) 255 |> then(fn rows -> 256 row_count = length(rows) 257 258 case row_count > limit do 259 true -> 260 rows = Enum.take(rows, limit) 261 Enum.join(rows, "\n") <> "\n#{row_count - limit} more rows..." 262 263 false -> 264 Enum.join(rows, "\n") 265 end 266 end) 267 end 268 269 def mean([]), do: 0 270 def mean(numbers), do: Enum.sum(numbers) / length(numbers) 271 272 def pretty_number(number) when is_integer(number) do 273 number 274 |> Integer.to_string() 275 |> commas() 276 end 277 278 def pretty_number(number) when is_float(number) do 279 string = :erlang.float_to_binary(number, decimals: 2) 280 [i, f] = String.split(string, ".") 281 commas(i) <> "." <> f 282 end 283 284 defp commas(number_string) when is_binary(number_string) do 285 number_string 286 |> String.to_charlist() 287 |> Enum.reverse() 288 |> Enum.chunk_every(3) 289 |> Enum.map(&Enum.reverse/1) 290 |> Enum.reverse() 291 |> Enum.join(",") 292 end 293end