Skip to content

Instantly share code, notes, and snippets.

@hansihe
Created December 13, 2024 09:19
Show Gist options
  • Save hansihe/4607b8ba1660aeaa4171753033f1a2ae to your computer and use it in GitHub Desktop.
Save hansihe/4607b8ba1660aeaa4171753033f1a2ae to your computer and use it in GitHub Desktop.

Regulator Testbench

Mix.install([
  {:regulator, "~> 0.6.0"},
  {:kino, "~> 0.14.2"},
  {:kino_vega_lite, "~> 0.1.9"},
  {:vega_lite, "~> 0.1.11"}
])

alias VegaLite, as: Vl

Section

# {:ok, pid} = Regulator.install(:service, {Regulator.Limit.Gradient, []})
{:ok, pid} = Regulator.install(:service, {Regulator.Limit.AIMD, [timeout: 100]})
defmodule ServerModel do
  @moduledoc """
  Simulates web server behavior under different load conditions.
  Returns latency and success probability for a given request rate.
  """

  defmodule Config do
    @moduledoc "Server configuration parameters"
    defstruct [
      normal_latency: 50,        # Base latency in ms
      capacity_threshold: 100,    # Requests/sec where degradation starts
      max_capacity: 200,         # Requests/sec where failures begin
      db_timeout: 2000           # Database timeout in ms
    ]
  end

  defmodule Response do
    @moduledoc "Simulation response structure"
    defstruct [:latency, :success_probability]
  end

  @doc """
  Simulates server behavior for a given requests per second (RPS).
  Returns a Response struct with latency and success probability.

  ## Parameters
    - rps: Requests per second
    - config: Optional Config struct with server parameters

  ## Examples
      ServerModel.simulate(150)
      %ServerModel.Response{latency: 135.91, success_probability: 1.0}

      ServerModel.simulate(250)
      %ServerModel.Response{latency: 2000.0, success_probability: 0.75}
  """
  def simulate(rps, config \\ %Config{}) do
    # Start with base latency and 100% success
    initial = %Response{
      latency: config.normal_latency,
      success_probability: 1.0
    }

    # Apply degradation if over threshold
    with_degradation = 
      if rps > config.capacity_threshold do
        load = (rps - config.capacity_threshold) / 
               (config.max_capacity - config.capacity_threshold)
        
        # Exponential latency increase
        new_latency = config.normal_latency * :math.exp(load * 2)
        %{initial | latency: new_latency}
      else
        initial
      end

    # Apply failures if over max capacity
    with_failures = 
      if rps > config.max_capacity do
        overload = (rps - config.max_capacity) / config.max_capacity
        new_probability = max(0.0, 1.0 - overload)
        %{with_degradation | success_probability: new_probability}
      else
        with_degradation
      end

    # Cap latency at database timeout
    %{with_failures | 
      latency: min(with_failures.latency, config.db_timeout)
    }
  end
end

defmodule TimestepEMA do
  @moduledoc """
  Implements Exponential Moving Average (EMA) with timestep compensation.
  This is useful for handling data streams with irregular time intervals.
  """

  defstruct [:last_value, :last_timestamp, :decay_factor]

  @type t :: %__MODULE__{
    last_value: float() | nil,
    last_timestamp: integer() | nil,
    decay_factor: float()
  }

  @doc """
  Creates a new EMA calculator with the specified decay factor.
  The decay factor determines how quickly older values lose importance.
  
  ## Parameters
    * decay_factor: A value between 0 and 1, where higher values give more
      weight to recent observations.
  """
  @spec new(float()) :: t()
  def new(decay_factor) when decay_factor > 0 and decay_factor < 1 do
    %__MODULE__{
      last_value: nil,
      last_timestamp: nil,
      decay_factor: decay_factor
    }
  end

  @doc """
  Updates the EMA with a new value and timestamp.
  
  ## Parameters
    * ema: The current EMA struct
    * value: The new value to incorporate
    * timestamp: The timestamp of the new value (in any consistent time unit)
    
  ## Returns
    A tuple containing the new EMA value and updated EMA struct
  """
  @spec update(t(), float(), integer()) :: {float(), t()}
  def update(%__MODULE__{last_value: nil} = ema, value, timestamp) do
    # First value initialization
    {value, %{ema | last_value: value, last_timestamp: timestamp}}
  end

  def update(%__MODULE__{} = ema, value, timestamp) do
    # Calculate time difference and adjust decay
    time_diff = timestamp - ema.last_timestamp
    adjusted_alpha = 1.0 - :math.pow(1.0 - ema.decay_factor, time_diff)
    
    # Calculate new EMA value
    new_ema_value = ema.last_value + adjusted_alpha * (value - ema.last_value)
    
    # Return new value and updated struct
    {new_ema_value, %{ema | 
      last_value: new_ema_value,
      last_timestamp: timestamp
    }}
  end
end
defmodule LoadProcess do
  defstruct [
    parent: nil,
    log_frame: nil,
    last_watermark: nil,
    rps: nil,
    server_config: nil,
    dropped: 0,
    dropped_ema: TimestepEMA.new(0.7)
  ]

  def start_link(parent, log_frame, form) do
    spawn_link(fn ->
      run(parent, log_frame, form)
    end)
  end
  
  def run(parent, log_frame, form) do
    state = %__MODULE__{
      parent: parent,
      log_frame: log_frame,
      last_watermark: :os.system_time(:millisecond)
    }
    Kino.Control.subscribe(form, :param_change)

    # Receive initial state
    receive do
      {:param_change, params} ->
        %{data: data} = params
        state = apply_params(data, state)
        loop(state)
    end
  end

  def apply_params(data, state) do
    Kino.Frame.render(state.log_frame, Kino.Text.new(inspect(data)))

    %{
      state |
      rps: data.rps,
      server_config: %ServerModel.Config{
        normal_latency: data.svc_latency,
        capacity_threshold: data.svc_degredation_capacity,
        max_capacity: data.svc_max_capacity,
        db_timeout: data.svc_timeout
      }
    }
  end

  def loop(state) do
    receive do
      {:param_change, params} ->
        %{data: data} = params
        state = apply_params(data, state)
        loop(state)
        
      :dropped ->
        state = %{state | dropped: state.dropped + 1}
        loop(state)
    after
      20 ->        
        curr_ms = :os.system_time(:millisecond)
        dt_ms = curr_ms - state.last_watermark

        {dropped_val, dropped_ema} = TimestepEMA.update(state.dropped_ema, state.dropped, curr_ms)
        adjusted_rps = max(0, state.rps - (dropped_val * 1000))
        
        load_state = ServerModel.simulate(adjusted_rps, state.server_config)
        
        #IO.inspect(state.last_watermark, label: :last_watermark)
        float_requests = (dt_ms / 1000) * state.rps
        #IO.inspect(float_requests, label: :float_requests)
        requests = trunc(float_requests)
        #IO.inspect(requests, label: :ts_requests)
        remainder = float_requests - requests
        #IO.inspect(remainder, label: :remainder)

        curr_watermark = curr_ms + ((remainder / state.rps) * 1000)
        #IO.inspect(curr_watermark, label: :curr_watermark)
        state = %{
          state |
          last_watermark: curr_watermark,
          dropped: 0,
          dropped_ema: dropped_ema
        }

        parent = self()
        
        for _n <- 0..requests do
          spawn(fn ->
            resp = Regulator.ask(:service, fn ->
              Process.sleep(trunc(load_state.latency))
              if :rand.uniform() <= load_state.success_probability do
                {:ok, :ok}
              else
                {:error, :error}
              end
            end)
            case resp do
              :dropped ->
                send(parent, :dropped)

              _ ->
                nil
            end
          end)
        end
        
        loop(state)
    end
  end
end

defmodule StatsProcess do
  defstruct [
    chart: nil,
    log_frame: nil,
    start_ms: nil,
    next_flush: nil,
    ok_count: 0,
    dropped_count: 0,
    error_count: 0,
    flush_interval: 300
  ]
  
  def start_link(chart, log_frame, form) do
    spawn_link(fn ->
      run(chart, log_frame)
    end)
  end

  def run(chart, log_frame) do
    stats_proc = self()
    :ok =
      :telemetry.attach(
        "r1",
        [:regulator, :ask, :stop],
        fn _name, _meas, meta, _config ->
          #IO.inspect(meta)
          send stats_proc, {:ask_stop, meta}
        end,
        nil
      )

    state = %__MODULE__{
      chart: chart,
      log_frame: log_frame,
      start_ms: :os.system_time(:millisecond),
      next_flush: :os.system_time(:millisecond) + 300
    }
    loop(state)
  end

  def loop(state) do
    timeout_after = max(0, state.next_flush - :os.system_time(:millisecond))
    #Kino.Frame.render(state.log_frame, Kino.Text.new(inspect(timeout_after)))
    receive do
      {:ask_stop, meta} ->
        #Kino.Frame.render(state.log_frame, Kino.Text.new(inspect(meta)))
        state = case meta.result do
          :ok -> %{state | ok_count: state.ok_count + 1}
          :error -> %{state | error_count: state.error_count + 1}
          :dropped -> %{state | dropped_count: state.dropped_count + 1}
        end
        loop(state)
    after
      timeout_after ->
        curr_time = (:os.system_time(:millisecond) - state.start_ms) / 1000
        total = state.ok_count + state.error_count + state.dropped_count
        {ok_ratio, error_ratio, ignored_ratio} = if total == 0 do
          {nil, nil, nil}
        else
          {state.ok_count / total, state.error_count / total, state.dropped_count / total}
        end
        Kino.VegaLite.push(state.chart, %{x: curr_time, y1: ok_ratio, y2: error_ratio, y3: ignored_ratio}, window: 100)
        state = %{
          state |
          ok_count: 0,
          error_count: 0,
          dropped_count: 0,
          next_flush: :os.system_time(:millisecond) + state.flush_interval
        }
        loop(state)
    end
  end
end
#chart =
#  Vl.new(width: 400, height: 400)
#  |> Vl.mark(:line)
#  |> Vl.encode_field(:x, "x", type: :quantitative)
#  |> Vl.encode_field(:y, "y", type: :quantitative)
#  |> Kino.VegaLite.render()
chart =
  Vl.new(width: 800, height: 400)
  |> Vl.layers([
    Vl.new()
    |> Vl.mark(:line)
    |> Vl.encode_field(:x, "x", type: :quantitative)
    |> Vl.encode_field(:y, "y1", type: :quantitative),
    
    Vl.new()
    |> Vl.mark(:line, color: "red")
    |> Vl.encode_field(:x, "x", type: :quantitative)
    |> Vl.encode_field(:y, "y2", type: :quantitative),

    Vl.new()
    |> Vl.mark(:line, color: "gray")
    |> Vl.encode_field(:x, "x", type: :quantitative)
    |> Vl.encode_field(:y, "y3", type: :quantitative)
  ])
  |> Kino.VegaLite.render()

log_frame = Kino.Frame.new() |> Kino.render()

nil

#for i <- 1..300 do
#  point = %{x: i / 10, y: :math.sin(i / 10)}
#  Kino.VegaLite.push(chart, point)
#  Process.sleep(25)
#end

rps_input = Kino.Input.range("RPS", default: 100, min: 10, max: 500)
svc_latency_input = Kino.Input.range("Service Latency (ms)", default: 50, min: 2, max: 500)
svc_timeout_input = Kino.Input.range("Service Timeout (ms)", default: 2000, min: 1000, max: 5000)
svc_degredation_capacity_input = Kino.Input.range("Service Degredation Threshold (rps)", default: 120, min: 10, max: 500)
svc_max_capacity_input = Kino.Input.range("Service Max Capacity (rps)", default: 200, min: 10, max: 500)

inputs = [
  rps: rps_input,
  svc_latency: svc_latency_input,
  svc_timeout: svc_timeout_input,
  svc_degredation_capacity: svc_degredation_capacity_input,
  svc_max_capacity: svc_max_capacity_input
]

form = Kino.Control.form(inputs, report_changes: true)
stats_process = StatsProcess.start_link(chart, log_frame, form)
load_process = LoadProcess.start_link(stats_process, log_frame, form)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment