this repo has no description
1defmodule Hobbes.Workloads do
2 require Logger
3 alias Trinity.{Sim, SimServer, SimLogger}
4
5 alias Hobbes.Structs.Cluster
6
7 import Hobbes.Utils
8
9 defmodule Workload do
10 @callback run(context :: map, opts :: keyword) :: {:ok, term}
11 end
12
13 defmodule Runner do
14 use GenServer
15 alias Trinity.SimServer
16
17 defmodule State do
18 @enforce_keys [:workload_module, :opts, :context, :results]
19 defstruct @enforce_keys
20 end
21
22 @spec start_link(module, keyword, map) :: GenServer.on_start
23 def start_link(workload_module, opts, context) do
24 SimServer.start_link(__MODULE__, {workload_module, opts, context})
25 end
26
27 def run(server), do: SimServer.cast(server, :run)
28
29 def get_results(server), do: SimServer.call(server, :get_results, :infinity)
30
31 def init({workload_module, opts, context})
32 when is_atom(workload_module) and is_list(opts) and is_map(context) do
33 {:ok, %State{
34 workload_module: workload_module,
35 opts: opts,
36 context: context,
37 results: nil,
38 }}
39 end
40
41 def handle_call(:get_results, _from, %State{} = state) do
42 {:reply, state.results, state}
43 end
44
45 def handle_cast(:run, %State{} = state) do
46 results = state.workload_module.run(state.context, state.opts)
47 {:noreply, %State{state | results: results}}
48 end
49 end
50
51 @type workload_opts :: [
52 simulated: boolean,
53 ]
54
55 @spec run([{module, keyword}], keyword) :: :ok
56 def run(workloads, opts \\ []) when is_list(workloads) do
57 {count, opts} = Keyword.pop(opts, :count, 1)
58 {seed, opts} = Keyword.pop(opts, :seed, nil)
59
60 seeds =
61 if seed do
62 [seed]
63 else
64 100..(100 + count - 1)
65 end
66
67 {:ok, supervisor} = Task.Supervisor.start_link()
68
69 results =
70 Enum.map(seeds, fn seed ->
71 Task.Supervisor.async_nolink(supervisor, fn ->
72 if opts[:simulated] do
73 Sim.run_simulation(fn ->
74 run_one(seed, workloads, opts)
75 end, seed: seed)
76 else
77 run_one(seed, workloads, opts)
78 end
79 end)
80 end)
81 |> Task.yield_many(:infinity)
82 |> Enum.map(fn {_task, result} -> result end)
83 |> Enum.zip(seeds)
84
85 {succeeded, failed} = Enum.split_with(results, fn
86 {{:ok, _value}, _seed} -> true
87 {{:exit, _error}, _seed} -> false
88 end)
89
90 succeeded_message =
91 succeeded
92 |> Enum.map(fn {{:ok, {:ok, result}}, seed} ->
93 """
94 Seed #{seed} succeeded with results:
95 #{indent(result.message)}\
96 """
97 end)
98 |> Enum.join("\n")
99
100 failed_message =
101 failed
102 |> Enum.map(fn
103 {{:exit, {:timeout, _mfa} = reason}, seed} ->
104 """
105 - Seed #{inspect(seed)} timed out
106 #{Exception.format_exit(reason) |> color(:red) |> indent()}\
107 """
108
109 {{:exit, {exception, stacktrace}}, seed} ->
110 """
111 - Seed #{inspect(seed)} failed with error:
112 #{Exception.format_banner(:error, exception, stacktrace) |> color(:red) |> indent()}
113 #{"stacktrace:" |> color(:cyan) |> indent()}
114 #{Exception.format_stacktrace(stacktrace)}
115 """
116
117 {{:exit, reason}, seed} ->
118 """
119 - Seed #{inspect(seed)} failed with reason:
120 #{inspect(reason) |> color(:red)}
121 """
122 end)
123 |> Enum.join("\n")
124
125 num_failed = length(failed)
126
127 header = """
128
129 Results for test #{inspect(opts[:name] || "Untitled")}
130
131 #{length(results)} test workloads, #{num_failed} failures\
132 """
133
134 Logger.info """
135 #{color(header, :cyan)}
136
137 #{(if num_failed > 0, do: failed_message, else: succeeded_message) |> indent()}\
138 """
139
140 if num_failed > 0, do: raise "At least one workload failed!"
141
142 :ok
143 end
144
145 def run_one(_seed, workloads, opts \\ []) do
146 {cluster_opts, _opts} = Keyword.pop(opts, :cluster_opts, [])
147 cluster_name = "cluster"
148
149 {:ok, _pid} = Hobbes.Cache.start_link()
150
151 # Use a distributed cluster in simulation
152 cluster_opts = Keyword.put(cluster_opts, :distributed, Sim.simulated?())
153 cluster_opts = Keyword.put(cluster_opts, :cluster_name, cluster_name)
154 {:ok, _coordinator_pids} = Hobbes.Sandbox.start_cluster(cluster_opts)
155
156 # Keep retrying get_cluster until coordinators are connected
157 {:ok, %Cluster{} = cluster} =
158 retry_acc(nil, fn nil ->
159 case Hobbes.Cache.lookup_cluster(cluster_name) do
160 {:ok, _cluster} = result -> {:halt, result}
161 {:error, _err} = error -> {:cont, nil, error}
162 end
163 end, 20)
164
165 context = %{cluster_name: cluster_name, cluster: cluster}
166
167 runners = Enum.map(workloads, fn {module, opts} when is_atom(module) and is_list(opts) ->
168 {:ok, pid} = Runner.start_link(module, opts, context)
169 pid
170 end)
171
172 Enum.each(runners, &Runner.run/1)
173 results = Enum.map(runners, &Runner.get_results/1)
174
175 result_logs =
176 Enum.zip(workloads, results)
177 |> Enum.map(fn {{module, opts}, {:ok, message}} when is_binary(message) ->
178 """
179 - #{inspect(module)} with opts #{inspect(opts)}
180 #{message |> color(:light_black) |> indent(4)}\
181 """
182 end)
183 |> Enum.join("\n\n")
184
185 {hash, log_size} =
186 case Sim.simulated?() do
187 true -> {SimLogger.get_hash, SimLogger.get_log_size()}
188 false -> {0, 0}
189 end
190 #Logger.debug SimLogger.log_tail(10_000) |> Enum.join("\n")
191
192 readable_hash =
193 :crypto.hash(:sha256, <<hash::integer-32>>)
194 |> Base.encode32(padding: false)
195 |> String.slice(0, 8)
196
197 message = """
198 Logged #{log_size} events with hash #{hash}
199 SHA256: #{readable_hash}
200
201 #{result_logs}
202 """
203 {:ok, %{message: message}}
204 end
205
206 defp indent(string, spaces \\ 2) when is_binary(string) and is_integer(spaces) do
207 whitespace = String.duplicate(" ", spaces)
208 whitespace <> String.replace(string, "\n", "\n" <> whitespace)
209 end
210
211 defp color(string, color) when is_atom(color) do
212 IO.ANSI.format([color, string])
213 |> IO.iodata_to_binary()
214 end
215
216 def table(rows, opts \\ []) do
217 cols = Keyword.get_lazy(opts, :cols, fn ->
218 Enum.map(1..length(hd(rows)), &Integer.to_string/1)
219 end)
220
221 align = Keyword.get_lazy(opts, :align, fn ->
222 Enum.map(1..length(hd(rows)), fn _i -> :left end)
223 end)
224
225 limit = case Keyword.get(opts, :limit, :infinity) do
226 :infinity -> :infinity
227 # +3 for header
228 limit when is_integer(limit) and limit > 0 -> limit + 3
229 end
230
231 rows = [cols | rows]
232 col_sizes = Enum.map(0..(length(hd(rows)) - 1), fn c_i ->
233 rows
234 |> Enum.map(fn r -> Enum.at(r, c_i) |> String.length() end)
235 |> Enum.max()
236 end)
237 col_meta = Enum.zip(col_sizes, align)
238
239 rows
240 |> Enum.map(fn r ->
241 r
242 |> Enum.zip(col_meta)
243 |> Enum.map(fn {v, {c_size, c_align}} ->
244 case c_align do
245 :left -> String.pad_trailing(v, c_size)
246 :right -> String.pad_leading(v, c_size)
247 end
248 end)
249 |> Enum.join(" | ")
250 end)
251 |> then(fn [header | rest] ->
252 sep = String.duplicate("-", String.length(header))
253 [sep | [header | [sep | rest]]]
254 end)
255 |> then(fn rows ->
256 row_count = length(rows)
257
258 case row_count > limit do
259 true ->
260 rows = Enum.take(rows, limit)
261 Enum.join(rows, "\n") <> "\n#{row_count - limit} more rows..."
262
263 false ->
264 Enum.join(rows, "\n")
265 end
266 end)
267 end
268
269 def mean([]), do: 0
270 def mean(numbers), do: Enum.sum(numbers) / length(numbers)
271
272 def pretty_number(number) when is_integer(number) do
273 number
274 |> Integer.to_string()
275 |> commas()
276 end
277
278 def pretty_number(number) when is_float(number) do
279 string = :erlang.float_to_binary(number, decimals: 2)
280 [i, f] = String.split(string, ".")
281 commas(i) <> "." <> f
282 end
283
284 defp commas(number_string) when is_binary(number_string) do
285 number_string
286 |> String.to_charlist()
287 |> Enum.reverse()
288 |> Enum.chunk_every(3)
289 |> Enum.map(&Enum.reverse/1)
290 |> Enum.reverse()
291 |> Enum.join(",")
292 end
293end