Mix.install([
{:regulator, "~> 0.6.0"},
{:kino, "~> 0.14.2"},
{:kino_vega_lite, "~> 0.1.9"},
{:vega_lite, "~> 0.1.11"}
])
alias VegaLite, as: Vl
# {:ok, pid} = Regulator.install(:service, {Regulator.Limit.Gradient, []})
{:ok, pid} = Regulator.install(:service, {Regulator.Limit.AIMD, [timeout: 100]})
defmodule ServerModel do
@moduledoc """
Simulates web server behavior under different load conditions.
Returns latency and success probability for a given request rate.
"""
defmodule Config do
@moduledoc "Server configuration parameters"
defstruct [
normal_latency: 50, # Base latency in ms
capacity_threshold: 100, # Requests/sec where degradation starts
max_capacity: 200, # Requests/sec where failures begin
db_timeout: 2000 # Database timeout in ms
]
end
defmodule Response do
@moduledoc "Simulation response structure"
defstruct [:latency, :success_probability]
end
@doc """
Simulates server behavior for a given requests per second (RPS).
Returns a Response struct with latency and success probability.
## Parameters
- rps: Requests per second
- config: Optional Config struct with server parameters
## Examples
ServerModel.simulate(150)
%ServerModel.Response{latency: 135.91, success_probability: 1.0}
ServerModel.simulate(250)
%ServerModel.Response{latency: 2000.0, success_probability: 0.75}
"""
def simulate(rps, config \\ %Config{}) do
# Start with base latency and 100% success
initial = %Response{
latency: config.normal_latency,
success_probability: 1.0
}
# Apply degradation if over threshold
with_degradation =
if rps > config.capacity_threshold do
load = (rps - config.capacity_threshold) /
(config.max_capacity - config.capacity_threshold)
# Exponential latency increase
new_latency = config.normal_latency * :math.exp(load * 2)
%{initial | latency: new_latency}
else
initial
end
# Apply failures if over max capacity
with_failures =
if rps > config.max_capacity do
overload = (rps - config.max_capacity) / config.max_capacity
new_probability = max(0.0, 1.0 - overload)
%{with_degradation | success_probability: new_probability}
else
with_degradation
end
# Cap latency at database timeout
%{with_failures |
latency: min(with_failures.latency, config.db_timeout)
}
end
end
defmodule TimestepEMA do
@moduledoc """
Implements Exponential Moving Average (EMA) with timestep compensation.
This is useful for handling data streams with irregular time intervals.
"""
defstruct [:last_value, :last_timestamp, :decay_factor]
@type t :: %__MODULE__{
last_value: float() | nil,
last_timestamp: integer() | nil,
decay_factor: float()
}
@doc """
Creates a new EMA calculator with the specified decay factor.
The decay factor determines how quickly older values lose importance.
## Parameters
* decay_factor: A value between 0 and 1, where higher values give more
weight to recent observations.
"""
@spec new(float()) :: t()
def new(decay_factor) when decay_factor > 0 and decay_factor < 1 do
%__MODULE__{
last_value: nil,
last_timestamp: nil,
decay_factor: decay_factor
}
end
@doc """
Updates the EMA with a new value and timestamp.
## Parameters
* ema: The current EMA struct
* value: The new value to incorporate
* timestamp: The timestamp of the new value (in any consistent time unit)
## Returns
A tuple containing the new EMA value and updated EMA struct
"""
@spec update(t(), float(), integer()) :: {float(), t()}
def update(%__MODULE__{last_value: nil} = ema, value, timestamp) do
# First value initialization
{value, %{ema | last_value: value, last_timestamp: timestamp}}
end
def update(%__MODULE__{} = ema, value, timestamp) do
# Calculate time difference and adjust decay
time_diff = timestamp - ema.last_timestamp
adjusted_alpha = 1.0 - :math.pow(1.0 - ema.decay_factor, time_diff)
# Calculate new EMA value
new_ema_value = ema.last_value + adjusted_alpha * (value - ema.last_value)
# Return new value and updated struct
{new_ema_value, %{ema |
last_value: new_ema_value,
last_timestamp: timestamp
}}
end
end
defmodule LoadProcess do
defstruct [
parent: nil,
log_frame: nil,
last_watermark: nil,
rps: nil,
server_config: nil,
dropped: 0,
dropped_ema: TimestepEMA.new(0.7)
]
def start_link(parent, log_frame, form) do
spawn_link(fn ->
run(parent, log_frame, form)
end)
end
def run(parent, log_frame, form) do
state = %__MODULE__{
parent: parent,
log_frame: log_frame,
last_watermark: :os.system_time(:millisecond)
}
Kino.Control.subscribe(form, :param_change)
# Receive initial state
receive do
{:param_change, params} ->
%{data: data} = params
state = apply_params(data, state)
loop(state)
end
end
def apply_params(data, state) do
Kino.Frame.render(state.log_frame, Kino.Text.new(inspect(data)))
%{
state |
rps: data.rps,
server_config: %ServerModel.Config{
normal_latency: data.svc_latency,
capacity_threshold: data.svc_degredation_capacity,
max_capacity: data.svc_max_capacity,
db_timeout: data.svc_timeout
}
}
end
def loop(state) do
receive do
{:param_change, params} ->
%{data: data} = params
state = apply_params(data, state)
loop(state)
:dropped ->
state = %{state | dropped: state.dropped + 1}
loop(state)
after
20 ->
curr_ms = :os.system_time(:millisecond)
dt_ms = curr_ms - state.last_watermark
{dropped_val, dropped_ema} = TimestepEMA.update(state.dropped_ema, state.dropped, curr_ms)
adjusted_rps = max(0, state.rps - (dropped_val * 1000))
load_state = ServerModel.simulate(adjusted_rps, state.server_config)
#IO.inspect(state.last_watermark, label: :last_watermark)
float_requests = (dt_ms / 1000) * state.rps
#IO.inspect(float_requests, label: :float_requests)
requests = trunc(float_requests)
#IO.inspect(requests, label: :ts_requests)
remainder = float_requests - requests
#IO.inspect(remainder, label: :remainder)
curr_watermark = curr_ms + ((remainder / state.rps) * 1000)
#IO.inspect(curr_watermark, label: :curr_watermark)
state = %{
state |
last_watermark: curr_watermark,
dropped: 0,
dropped_ema: dropped_ema
}
parent = self()
for _n <- 0..requests do
spawn(fn ->
resp = Regulator.ask(:service, fn ->
Process.sleep(trunc(load_state.latency))
if :rand.uniform() <= load_state.success_probability do
{:ok, :ok}
else
{:error, :error}
end
end)
case resp do
:dropped ->
send(parent, :dropped)
_ ->
nil
end
end)
end
loop(state)
end
end
end
defmodule StatsProcess do
defstruct [
chart: nil,
log_frame: nil,
start_ms: nil,
next_flush: nil,
ok_count: 0,
dropped_count: 0,
error_count: 0,
flush_interval: 300
]
def start_link(chart, log_frame, form) do
spawn_link(fn ->
run(chart, log_frame)
end)
end
def run(chart, log_frame) do
stats_proc = self()
:ok =
:telemetry.attach(
"r1",
[:regulator, :ask, :stop],
fn _name, _meas, meta, _config ->
#IO.inspect(meta)
send stats_proc, {:ask_stop, meta}
end,
nil
)
state = %__MODULE__{
chart: chart,
log_frame: log_frame,
start_ms: :os.system_time(:millisecond),
next_flush: :os.system_time(:millisecond) + 300
}
loop(state)
end
def loop(state) do
timeout_after = max(0, state.next_flush - :os.system_time(:millisecond))
#Kino.Frame.render(state.log_frame, Kino.Text.new(inspect(timeout_after)))
receive do
{:ask_stop, meta} ->
#Kino.Frame.render(state.log_frame, Kino.Text.new(inspect(meta)))
state = case meta.result do
:ok -> %{state | ok_count: state.ok_count + 1}
:error -> %{state | error_count: state.error_count + 1}
:dropped -> %{state | dropped_count: state.dropped_count + 1}
end
loop(state)
after
timeout_after ->
curr_time = (:os.system_time(:millisecond) - state.start_ms) / 1000
total = state.ok_count + state.error_count + state.dropped_count
{ok_ratio, error_ratio, ignored_ratio} = if total == 0 do
{nil, nil, nil}
else
{state.ok_count / total, state.error_count / total, state.dropped_count / total}
end
Kino.VegaLite.push(state.chart, %{x: curr_time, y1: ok_ratio, y2: error_ratio, y3: ignored_ratio}, window: 100)
state = %{
state |
ok_count: 0,
error_count: 0,
dropped_count: 0,
next_flush: :os.system_time(:millisecond) + state.flush_interval
}
loop(state)
end
end
end
#chart =
# Vl.new(width: 400, height: 400)
# |> Vl.mark(:line)
# |> Vl.encode_field(:x, "x", type: :quantitative)
# |> Vl.encode_field(:y, "y", type: :quantitative)
# |> Kino.VegaLite.render()
chart =
Vl.new(width: 800, height: 400)
|> Vl.layers([
Vl.new()
|> Vl.mark(:line)
|> Vl.encode_field(:x, "x", type: :quantitative)
|> Vl.encode_field(:y, "y1", type: :quantitative),
Vl.new()
|> Vl.mark(:line, color: "red")
|> Vl.encode_field(:x, "x", type: :quantitative)
|> Vl.encode_field(:y, "y2", type: :quantitative),
Vl.new()
|> Vl.mark(:line, color: "gray")
|> Vl.encode_field(:x, "x", type: :quantitative)
|> Vl.encode_field(:y, "y3", type: :quantitative)
])
|> Kino.VegaLite.render()
log_frame = Kino.Frame.new() |> Kino.render()
nil
#for i <- 1..300 do
# point = %{x: i / 10, y: :math.sin(i / 10)}
# Kino.VegaLite.push(chart, point)
# Process.sleep(25)
#end
rps_input = Kino.Input.range("RPS", default: 100, min: 10, max: 500)
svc_latency_input = Kino.Input.range("Service Latency (ms)", default: 50, min: 2, max: 500)
svc_timeout_input = Kino.Input.range("Service Timeout (ms)", default: 2000, min: 1000, max: 5000)
svc_degredation_capacity_input = Kino.Input.range("Service Degredation Threshold (rps)", default: 120, min: 10, max: 500)
svc_max_capacity_input = Kino.Input.range("Service Max Capacity (rps)", default: 200, min: 10, max: 500)
inputs = [
rps: rps_input,
svc_latency: svc_latency_input,
svc_timeout: svc_timeout_input,
svc_degredation_capacity: svc_degredation_capacity_input,
svc_max_capacity: svc_max_capacity_input
]
form = Kino.Control.form(inputs, report_changes: true)
stats_process = StatsProcess.start_link(chart, log_frame, form)
load_process = LoadProcess.start_link(stats_process, log_frame, form)