Skip to content

Instantly share code, notes, and snippets.

@xtreme-sameer-vohra
Last active January 14, 2025 18:51
Show Gist options
  • Save xtreme-sameer-vohra/ec302ffee9c44badc8a9051e316fc56f to your computer and use it in GitHub Desktop.
Save xtreme-sameer-vohra/ec302ffee9c44badc8a9051e316fc56f to your computer and use it in GitHub Desktop.
Elixir OpenAI API Perf Test
defmodule OpenAIAPI do
require Logger
def run_tests(num_samples \\ 10) do
# Test prompt that will be consistent across all runs
test_messages = [
%{"role" => "user", "content" => "Write me a haiku about Toronto"}
]
IO.puts("\n=== Performance Test Results ===\n")
streaming_results_openai = test_streaming(test_messages, num_samples)
sync_results_openai = test_sync_response(test_messages, num_samples)
# Print results
print_results("Streaming", streaming_results_openai)
print_results("Sync", sync_results_openai)
end
defp test_streaming(messages, num_samples) do
results = for _ <- 1..num_samples do
{first_token_time, complete_time} = measure_streaming(messages)
%{first_token: first_token_time, complete: complete_time}
end
Process.sleep(1000) # Allow some cool-down between tests
results
end
defp test_sync_response(messages, num_samples) do
results = for _ <- 1..num_samples do
{time, _} = measure_sync_response(messages)
%{complete: time}
end
Process.sleep(1000) # Allow some cool-down between tests
results
end
defp measure_streaming(messages) do
start_time = System.monotonic_time(:millisecond)
OpenAIClient.stream(messages)
receive_stream(start_time, nil)
end
defp receive_stream(start_time, first_token_time) do
receive do
{:streaming_response, {:text, _content}} ->
current_time = System.monotonic_time(:millisecond)
# If this is the first token, save its time, otherwise keep existing first_token_time
new_first_token_time = if is_nil(first_token_time), do: current_time, else: first_token_time
# Continue receiving remaining tokens
receive_stream(start_time, new_first_token_time)
{:streaming_done} -> # Add this message when stream completes
complete_time = System.monotonic_time(:millisecond)
{first_token_time - start_time, complete_time - start_time}
after
30_000 ->
complete_time = System.monotonic_time(:millisecond)
{first_token_time - start_time, complete_time - start_time}
end
end
defp measure_sync_response(messages) do
start_time = System.monotonic_time(:millisecond)
result = OpenAIClient.response(messages)
end_time = System.monotonic_time(:millisecond)
{end_time - start_time, result}
end
defp print_results(test_name, results) do
IO.puts("\n#{test_name}:")
if Enum.any?(results, & Map.has_key?(&1, :first_token)) do
first_token_times = Enum.map(results, & &1.first_token)
IO.puts("First Token Times (ms):")
IO.puts(" Average: #{average(first_token_times)}")
IO.puts(" Min: #{Enum.min(first_token_times)}")
IO.puts(" Max: #{Enum.max(first_token_times)}")
IO.puts(" Std Dev: #{std_dev(first_token_times)}")
end
complete_times = Enum.map(results, & &1.complete)
IO.puts("Complete Response Times (ms):")
IO.puts(" Average: #{average(complete_times)}")
IO.puts(" Min: #{Enum.min(complete_times)}")
IO.puts(" Max: #{Enum.max(complete_times)}")
IO.puts(" Std Dev: #{std_dev(complete_times)}")
end
defp average(list), do: Enum.sum(list) / length(list)
defp std_dev(list) do
avg = average(list)
variance = Enum.reduce(list, 0, fn x, acc ->
acc + :math.pow(x - avg, 2)
end) / length(list)
:math.sqrt(variance)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment