Skip to content

Instantly share code, notes, and snippets.

@Kavignon
Created July 3, 2025 12:58
Show Gist options
  • Save Kavignon/39d3c4ce4401c27dc427f874d21e3139 to your computer and use it in GitHub Desktop.
Save Kavignon/39d3c4ce4401c27dc427f874d21e3139 to your computer and use it in GitHub Desktop.
Allows to connect to your database and monitor queries made / slow queries / long queue waits / connection errors
defmodule DatabasePoolMonitor do
@moduledoc """
Monitors and logs PostgreSQL pool behavior during tests using telemetry.
"""
def setup(repo, opts \\ []) do
table = :"test_pool_monitor_#{repo |> Module.split() |> List.last() |> String.downcase()}"
if :ets.whereis(table) == :undefined do
:ets.new(table, [:named_table, :public, read_concurrency: true])
:ets.insert(
table,
{:summary,
%{
connection_errors: 0,
total_checkouts: 0,
slow_checkouts: 0,
total_queries: 0,
slow_queries: 0,
long_queues: 0
}}
)
end
ns = opts[:telemetry_prefix] || repo.config()[:telemetry_prefix] || default_prefix(repo)
:telemetry.attach_many(
"test-pool-monitor-#{ns |> Enum.join("-")}",
[
ns ++ [:query],
ns ++ [:checkout],
ns ++ [:queue]
],
fn event, measurements, metadata, _ ->
handle_event(event, measurements, metadata, table, repo)
end,
nil
)
end
defp default_prefix(repo) do
base =
repo |> Module.split() |> List.last() |> String.trim_trailing("Repo") |> Macro.underscore()
[String.to_atom(base), :repo]
end
defp handle_event([_, :repo, :query], measurements, metadata, table, _repo) do
update_summary(table, fn acc ->
acc = Map.update!(acc, :total_queries, &(&1 + 1))
if Map.has_key?(measurements, :total_time) do
duration_ms = System.convert_time_unit(measurements.total_time, :native, :millisecond)
if duration_ms > 500 do
IO.puts("""
⚠️ SLOW QUERY (#{duration_ms} ms)
├── SQL: #{inspect(metadata.query)}
├── Params: #{inspect(metadata.params)}
└── Source: #{inspect(metadata.source)}
""")
Map.update!(acc, :slow_queries, &(&1 + 1))
else
acc
end
else
acc
end
end)
maybe_log_connection_error(metadata, table)
end
defp handle_event([_, :repo, :checkout], measurements, _meta, table, _repo) do
duration_ms = System.convert_time_unit(measurements.duration, :native, :millisecond)
update_summary(table, fn acc ->
acc = Map.update!(acc, :total_checkouts, &(&1 + 1))
if duration_ms > 200 do
IO.puts("⏱️ Slow DB checkout (#{duration_ms} ms)")
Map.update!(acc, :slow_checkouts, &(&1 + 1))
else
acc
end
end)
end
defp handle_event([_, :repo, :queue], measurements, _meta, table, _repo) do
duration_ms = System.convert_time_unit(measurements.duration, :native, :millisecond)
if duration_ms > 250 do
IO.puts("🚨 Queue wait exceeded 250ms: #{duration_ms}ms")
update_summary(table, fn acc -> Map.update!(acc, :long_queues, &(&1 + 1)) end)
end
end
defp maybe_log_connection_error(metadata, table) do
if match?(%DBConnection.ConnectionError{}, metadata[:error]) &&
String.contains?(Exception.message(metadata.error), "connection not available") do
IO.puts("🔴 Connection pool timeout detected!")
update_summary(table, fn acc -> Map.update!(acc, :connection_errors, &(&1 + 1)) end)
end
end
defp update_summary(table, fun) do
[{:summary, acc}] = :ets.lookup(table, :summary)
:ets.insert(table, {:summary, fun.(acc)})
end
def print_summary(repo, opts \\ []) do
table =
opts[:table] ||
:"test_pool_monitor_#{repo |> Module.split() |> List.last() |> String.downcase()}"
[{:summary, acc}] = :ets.lookup(table, :summary)
IO.puts("""
=== Pool Monitor Summary for #{inspect(repo)} ===
Total queries: #{acc.total_queries}
Slow queries (>500ms): #{acc.slow_queries}
Total checkouts: #{acc.total_checkouts}
Slow checkouts (>200ms): #{acc.slow_checkouts}
Long queue waits (>250ms): #{acc.long_queues}
Connection pool errors: #{acc.connection_errors}
============================
""")
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment